This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fbbb8822034 Merge MigrationJobAPI and MigrationJobManager (#29218)
fbbb8822034 is described below

commit fbbb88220342ca0b1a5da1e22681f2b2f6aa11ee
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 27 21:54:09 2023 +0800

    Merge MigrationJobAPI and MigrationJobManager (#29218)
---
 .../ShowMigrationSourceStorageUnitsExecutor.java   |   9 +-
 .../handler/update/MigrateTableUpdater.java        |  11 +-
 .../RegisterMigrationSourceStorageUnitUpdater.java |   9 +-
 ...nregisterMigrationSourceStorageUnitUpdater.java |   9 +-
 .../migration/api/impl/MigrationJobAPI.java        | 226 +++++++++++++++++-
 .../migration/api/impl/MigrationJobManager.java    | 261 ---------------------
 .../migration/api/impl/MigrationJobAPITest.java    |  20 +-
 7 files changed, 251 insertions(+), 294 deletions(-)

diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index bab525e6822..ca2d229bb4b 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -18,11 +18,12 @@
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
 
 import java.util.Arrays;
@@ -36,11 +37,11 @@ import java.util.List;
  */
 public final class ShowMigrationSourceStorageUnitsExecutor implements 
QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> {
     
-    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
+    private final MigrationJobAPI jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationSourceStorageUnitsStatement sqlStatement) {
-        Iterator<Collection<Object>> data = 
jobManager.listMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY)).iterator();
+        Iterator<Collection<Object>> data = 
jobAPI.listMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY)).iterator();
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         while (data.hasNext()) {
             result.add(new LocalDataQueryResultRow((List<Object>) 
data.next()));
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index 23f220d73fb..3d0214752d2 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -17,29 +17,28 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
 
 /**
  * Migrate table updater.
  */
-@Slf4j
 public final class MigrateTableUpdater implements 
RALUpdater<MigrateTableStatement> {
     
-    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
+    private final MigrationJobAPI jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
     
     @Override
     public void executeUpdate(final String databaseName, final 
MigrateTableStatement sqlStatement) {
         String targetDatabaseName = null == 
sqlStatement.getTargetDatabaseName() ? databaseName : 
sqlStatement.getTargetDatabaseName();
         ShardingSpherePreconditions.checkNotNull(targetDatabaseName, 
MissingRequiredTargetDatabaseException::new);
-        jobManager.start(new PipelineContextKey(InstanceType.PROXY), new 
MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
+        jobAPI.start(new PipelineContextKey(InstanceType.PROXY), new 
MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index d26fee0d0a1..346d76ff1c0 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
 import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePo
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.migration.distsql.statement.RegisterMigrationSourceStorageUnitStatement;
 
 import java.util.ArrayList;
@@ -43,7 +44,7 @@ import java.util.Map;
  */
 public final class RegisterMigrationSourceStorageUnitUpdater implements 
RALUpdater<RegisterMigrationSourceStorageUnitStatement> {
     
-    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
+    private final MigrationJobAPI jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
     
     private final DataSourcePoolPropertiesValidateHandler validateHandler = 
new DataSourcePoolPropertiesValidateHandler();
     
@@ -56,7 +57,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater 
implements RALUpdat
         DatabaseType databaseType = 
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
         Map<String, DataSourcePoolProperties> propsMap = 
DataSourceSegmentsConverter.convert(databaseType, dataSources);
         validateHandler.validate(propsMap);
-        jobManager.addMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), propsMap);
+        jobAPI.addMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), propsMap);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index 3f4cdfe20d2..77d6f4e3b34 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -18,10 +18,11 @@
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
 
 /**
@@ -29,11 +30,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigration
  */
 public final class UnregisterMigrationSourceStorageUnitUpdater implements 
RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {
     
-    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
+    private final MigrationJobAPI jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
     
     @Override
     public void executeUpdate(final String databaseName, final 
UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
-        jobManager.dropMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
+        jobAPI.dropMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index b89e63824d3..254aa881369 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -18,27 +18,73 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
+import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
+import 
org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
  * Migration job API.
@@ -46,11 +92,185 @@ import java.util.Collection;
 @Slf4j
 public final class MigrationJobAPI implements TransmissionJobAPI {
     
+    private final PipelineDataSourcePersistService dataSourcePersistService = 
new PipelineDataSourcePersistService();
+    
+    private final PipelineJobOption jobOption = new MigrationJobOption();
+    
+    /**
+     * Start migration job.
+     *
+     * @param contextKey context key
+     * @param param create migration job parameter
+     * @return job id
+     */
+    public String start(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
+        MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 param));
+        new PipelineJobManager(jobOption).start(jobConfig);
+        return jobConfig.getJobId();
+    }
+    
+    private YamlMigrationJobConfiguration buildYamlJobConfiguration(final 
PipelineContextKey contextKey, final MigrateTableStatement param) {
+        YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
+        result.setTargetDatabaseName(param.getTargetDatabaseName());
+        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, "MIGRATION");
+        Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
+        Map<String, YamlPipelineDataSourceConfiguration> configSources = new 
LinkedHashMap<>();
+        List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new 
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
+                .thenComparing(each -> 
DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
+        YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
+        for (SourceTargetEntry each : sourceTargetEntries) {
+            sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> 
new LinkedList<>()).add(each.getSource());
+            ShardingSpherePreconditions.checkState(1 == 
sourceDataNodes.get(each.getTargetTableName()).size(),
+                    () -> new PipelineInvalidParameterException("more than one 
source table for " + each.getTargetTableName()));
+            String dataSourceName = each.getSource().getDataSourceName();
+            if (configSources.containsKey(dataSourceName)) {
+                continue;
+            }
+            
ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
+                    () -> new PipelineInvalidParameterException(dataSourceName 
+ " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
+            Map<String, Object> sourceDataSourcePoolProps = 
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
+            StandardPipelineDataSourceConfiguration sourceDataSourceConfig = 
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
+            configSources.put(dataSourceName, 
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), 
sourceDataSourceConfig.getParameter()));
+            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
+            if (null == each.getSource().getSchemaName() && 
dialectDatabaseMetaData.isSchemaAvailable()) {
+                
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
+            }
+            DatabaseType sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType();
+            if (null == result.getSourceDatabaseType()) {
+                result.setSourceDatabaseType(sourceDatabaseType.getType());
+            } else if 
(!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
+                throw new PipelineInvalidParameterException("Source storage 
units have different database types");
+            }
+        }
+        result.setSources(configSources);
+        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
+        PipelineDataSourceConfiguration targetPipelineDataSourceConfig = 
buildTargetPipelineDataSourceConfiguration(targetDatabase);
+        
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
 targetPipelineDataSourceConfig.getParameter()));
+        
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
+        List<JobDataNodeEntry> tablesFirstDataNodes = 
sourceDataNodes.entrySet().stream()
+                .map(entry -> new JobDataNodeEntry(entry.getKey(), 
entry.getValue().subList(0, 1))).collect(Collectors.toList());
+        result.setTargetTableNames(new 
ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
+        
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
+        result.setTablesFirstDataNodes(new 
JobDataNodeLine(tablesFirstDataNodes).marshal());
+        
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
+        ((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, 
getType()).getOption()).extendYamlJobConfiguration(contextKey, result);
+        return result;
+    }
+    
+    private YamlPipelineDataSourceConfiguration 
buildYamlPipelineDataSourceConfiguration(final String type, final String param) 
{
+        YamlPipelineDataSourceConfiguration result = new 
YamlPipelineDataSourceConfiguration();
+        result.setType(type);
+        result.setParameter(param);
+        return result;
+    }
+    
+    private PipelineDataSourceConfiguration 
buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase 
targetDatabase) {
+        Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
+        YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
+        for (Entry<String, StorageUnit> entry : 
targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
+            targetPoolProps.put(entry.getKey(), 
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
+        }
+        YamlRootConfiguration targetRootConfig = 
buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, 
targetDatabase.getRuleMetaData().getConfigurations());
+        return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+    }
+    
+    private YamlRootConfiguration buildYamlRootConfiguration(final String 
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final 
Collection<RuleConfiguration> rules) {
+        ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new 
NoAnyRuleExistsException(databaseName));
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setDatabaseName(databaseName);
+        result.setDataSources(yamlDataSources);
+        result.setRules(new 
YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
+        return result;
+    }
+    
+    private Map<String, String> buildTargetTableSchemaMap(final Map<String, 
List<DataNode>> sourceDataNodes) {
+        Map<String, String> result = new LinkedHashMap<>();
+        sourceDataNodes.forEach((tableName, dataNodes) -> 
result.put(tableName, dataNodes.get(0).getSchemaName()));
+        return result;
+    }
+    
+    /**
+     * Add migration source resources.
+     *
+     * @param contextKey context key
+     * @param propsMap data source pool properties map
+     */
+    public void addMigrationSourceResources(final PipelineContextKey 
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
+        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, getType());
+        Collection<String> duplicateDataSourceNames = new 
HashSet<>(propsMap.size(), 1F);
+        for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
+            if (existDataSources.containsKey(entry.getKey())) {
+                duplicateDataSourceNames.add(entry.getKey());
+            }
+        }
+        
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () 
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
+        Map<String, DataSourcePoolProperties> result = new 
LinkedHashMap<>(existDataSources);
+        result.putAll(propsMap);
+        dataSourcePersistService.persist(contextKey, getType(), result);
+    }
+    
+    /**
+     * Drop migration source resources.
+     *
+     * @param contextKey context key
+     * @param resourceNames resource names
+     */
+    public void dropMigrationSourceResources(final PipelineContextKey 
contextKey, final Collection<String> resourceNames) {
+        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, getType());
+        List<String> noExistResources = resourceNames.stream().filter(each -> 
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
+        ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () 
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
+        for (String each : resourceNames) {
+            metaDataDataSource.remove(each);
+        }
+        dataSourcePersistService.persist(contextKey, getType(), 
metaDataDataSource);
+    }
+    
+    /**
+     * Query migration source resources list.
+     *
+     * @param contextKey context key
+     * @return migration source resources
+     */
+    public Collection<Collection<Object>> listMigrationSourceResources(final 
PipelineContextKey contextKey) {
+        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, getType());
+        Collection<Collection<Object>> result = new 
ArrayList<>(propsMap.size());
+        for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
+            String dataSourceName = entry.getKey();
+            DataSourcePoolProperties value = entry.getValue();
+            Collection<Object> props = new LinkedList<>();
+            props.add(dataSourceName);
+            String url = 
String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
+            DatabaseType databaseType = DatabaseTypeFactory.get(url);
+            props.add(databaseType.getType());
+            ConnectionProperties connectionProps = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
databaseType).parse(url, "", null);
+            props.add(connectionProps.getHostname());
+            props.add(connectionProps.getPort());
+            props.add(connectionProps.getCatalog());
+            Map<String, Object> standardProps = 
value.getPoolPropertySynonyms().getStandardProperties();
+            props.add(getStandardProperty(standardProps, 
"connectionTimeoutMilliseconds"));
+            props.add(getStandardProperty(standardProps, 
"idleTimeoutMilliseconds"));
+            props.add(getStandardProperty(standardProps, 
"maxLifetimeMilliseconds"));
+            props.add(getStandardProperty(standardProps, "maxPoolSize"));
+            props.add(getStandardProperty(standardProps, "minPoolSize"));
+            props.add(getStandardProperty(standardProps, "readOnly"));
+            Map<String, Object> otherProps = 
value.getCustomProperties().getProperties();
+            props.add(otherProps.isEmpty() ? "" : 
JsonUtils.toJsonString(otherProps));
+            result.add(props);
+        }
+        return result;
+    }
+    
+    private String getStandardProperty(final Map<String, Object> 
standardProps, final String key) {
+        if (standardProps.containsKey(key) && null != standardProps.get(key)) {
+            return standardProps.get(key).toString();
+        }
+        return "";
+    }
+    
     @Override
     public void commit(final String jobId) {
         log.info("Commit job {}", jobId);
         final long startTimeMillis = System.currentTimeMillis();
-        PipelineJobOption jobOption = new MigrationJobOption();
         PipelineJobManager jobManager = new PipelineJobManager(jobOption);
         jobManager.stop(jobId);
         dropCheckJobs(jobId);
@@ -72,7 +292,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         final long startTimeMillis = System.currentTimeMillis();
         dropCheckJobs(jobId);
         cleanTempTableOnRollback(jobId);
-        new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
getType()).getOption()).drop(jobId);
+        new PipelineJobManager(jobOption).drop(jobId);
         log.info("Rollback job {} cost {} ms", jobId, 
System.currentTimeMillis() - startTimeMillis);
     }
     
@@ -83,7 +303,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         }
         for (String each : checkJobIds) {
             try {
-                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
getType()).getOption()).drop(each);
+                new PipelineJobManager(jobOption).drop(each);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
deleted file mode 100644
index f356ed53733..00000000000
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
-
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
-import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
-import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
-import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
-import 
org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-/**
- * Migration job manager.
- */
-@Slf4j
-public final class MigrationJobManager {
-    
-    private final MigrationJobOption jobOption;
-    
-    private final PipelineJobManager jobManager;
-    
-    private final PipelineDataSourcePersistService dataSourcePersistService;
-    
-    public MigrationJobManager(final MigrationJobOption jobOption) {
-        this.jobOption = jobOption;
-        jobManager = new PipelineJobManager(jobOption);
-        dataSourcePersistService = new PipelineDataSourcePersistService();
-    }
-    
-    /**
-     * Start migration job.
-     *
-     * @param contextKey context key
-     * @param param create migration job parameter
-     * @return job id
-     */
-    public String start(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
-        MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 param));
-        jobManager.start(jobConfig);
-        return jobConfig.getJobId();
-    }
-    
-    private YamlMigrationJobConfiguration buildYamlJobConfiguration(final 
PipelineContextKey contextKey, final MigrateTableStatement param) {
-        YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
-        result.setTargetDatabaseName(param.getTargetDatabaseName());
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, "MIGRATION");
-        Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
-        Map<String, YamlPipelineDataSourceConfiguration> configSources = new 
LinkedHashMap<>();
-        List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new 
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
-                .thenComparing(each -> 
DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
-        YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
-        for (SourceTargetEntry each : sourceTargetEntries) {
-            sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> 
new LinkedList<>()).add(each.getSource());
-            ShardingSpherePreconditions.checkState(1 == 
sourceDataNodes.get(each.getTargetTableName()).size(),
-                    () -> new PipelineInvalidParameterException("more than one 
source table for " + each.getTargetTableName()));
-            String dataSourceName = each.getSource().getDataSourceName();
-            if (configSources.containsKey(dataSourceName)) {
-                continue;
-            }
-            
ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
-                    () -> new PipelineInvalidParameterException(dataSourceName 
+ " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
-            Map<String, Object> sourceDataSourcePoolProps = 
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
-            StandardPipelineDataSourceConfiguration sourceDataSourceConfig = 
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
-            configSources.put(dataSourceName, 
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), 
sourceDataSourceConfig.getParameter()));
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
-            if (null == each.getSource().getSchemaName() && 
dialectDatabaseMetaData.isSchemaAvailable()) {
-                
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
-            }
-            DatabaseType sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType();
-            if (null == result.getSourceDatabaseType()) {
-                result.setSourceDatabaseType(sourceDatabaseType.getType());
-            } else if 
(!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
-                throw new PipelineInvalidParameterException("Source storage 
units have different database types");
-            }
-        }
-        result.setSources(configSources);
-        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
-        PipelineDataSourceConfiguration targetPipelineDataSourceConfig = 
buildTargetPipelineDataSourceConfiguration(targetDatabase);
-        
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
 targetPipelineDataSourceConfig.getParameter()));
-        
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
-        List<JobDataNodeEntry> tablesFirstDataNodes = 
sourceDataNodes.entrySet().stream()
-                .map(entry -> new JobDataNodeEntry(entry.getKey(), 
entry.getValue().subList(0, 1))).collect(Collectors.toList());
-        result.setTargetTableNames(new 
ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
-        
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
-        result.setTablesFirstDataNodes(new 
JobDataNodeLine(tablesFirstDataNodes).marshal());
-        
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        jobOption.extendYamlJobConfiguration(contextKey, result);
-        return result;
-    }
-    
-    private YamlPipelineDataSourceConfiguration 
buildYamlPipelineDataSourceConfiguration(final String type, final String param) 
{
-        YamlPipelineDataSourceConfiguration result = new 
YamlPipelineDataSourceConfiguration();
-        result.setType(type);
-        result.setParameter(param);
-        return result;
-    }
-    
-    private PipelineDataSourceConfiguration 
buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase 
targetDatabase) {
-        Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
-        YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
-        for (Entry<String, StorageUnit> entry : 
targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
-            targetPoolProps.put(entry.getKey(), 
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
-        }
-        YamlRootConfiguration targetRootConfig = 
buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, 
targetDatabase.getRuleMetaData().getConfigurations());
-        return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
-    }
-    
-    private YamlRootConfiguration buildYamlRootConfiguration(final String 
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final 
Collection<RuleConfiguration> rules) {
-        ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new 
NoAnyRuleExistsException(databaseName));
-        YamlRootConfiguration result = new YamlRootConfiguration();
-        result.setDatabaseName(databaseName);
-        result.setDataSources(yamlDataSources);
-        result.setRules(new 
YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
-        return result;
-    }
-    
-    private Map<String, String> buildTargetTableSchemaMap(final Map<String, 
List<DataNode>> sourceDataNodes) {
-        Map<String, String> result = new LinkedHashMap<>();
-        sourceDataNodes.forEach((tableName, dataNodes) -> 
result.put(tableName, dataNodes.get(0).getSchemaName()));
-        return result;
-    }
-    
-    /**
-     * Add migration source resources.
-     *
-     * @param contextKey context key
-     * @param propsMap data source pool properties map
-     */
-    public void addMigrationSourceResources(final PipelineContextKey 
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
-        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, jobOption.getType());
-        Collection<String> duplicateDataSourceNames = new 
HashSet<>(propsMap.size(), 1F);
-        for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
-            if (existDataSources.containsKey(entry.getKey())) {
-                duplicateDataSourceNames.add(entry.getKey());
-            }
-        }
-        
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () 
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
-        Map<String, DataSourcePoolProperties> result = new 
LinkedHashMap<>(existDataSources);
-        result.putAll(propsMap);
-        dataSourcePersistService.persist(contextKey, jobOption.getType(), 
result);
-    }
-    
-    /**
-     * Drop migration source resources.
-     *
-     * @param contextKey context key
-     * @param resourceNames resource names
-     */
-    public void dropMigrationSourceResources(final PipelineContextKey 
contextKey, final Collection<String> resourceNames) {
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, jobOption.getType());
-        List<String> noExistResources = resourceNames.stream().filter(each -> 
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
-        ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () 
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
-        for (String each : resourceNames) {
-            metaDataDataSource.remove(each);
-        }
-        dataSourcePersistService.persist(contextKey, jobOption.getType(), 
metaDataDataSource);
-    }
-    
-    /**
-     * Query migration source resources list.
-     *
-     * @param contextKey context key
-     * @return migration source resources
-     */
-    public Collection<Collection<Object>> listMigrationSourceResources(final 
PipelineContextKey contextKey) {
-        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, jobOption.getType());
-        Collection<Collection<Object>> result = new 
ArrayList<>(propsMap.size());
-        for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
-            String dataSourceName = entry.getKey();
-            DataSourcePoolProperties value = entry.getValue();
-            Collection<Object> props = new LinkedList<>();
-            props.add(dataSourceName);
-            String url = 
String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
-            DatabaseType databaseType = DatabaseTypeFactory.get(url);
-            props.add(databaseType.getType());
-            ConnectionProperties connectionProps = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
databaseType).parse(url, "", null);
-            props.add(connectionProps.getHostname());
-            props.add(connectionProps.getPort());
-            props.add(connectionProps.getCatalog());
-            Map<String, Object> standardProps = 
value.getPoolPropertySynonyms().getStandardProperties();
-            props.add(getStandardProperty(standardProps, 
"connectionTimeoutMilliseconds"));
-            props.add(getStandardProperty(standardProps, 
"idleTimeoutMilliseconds"));
-            props.add(getStandardProperty(standardProps, 
"maxLifetimeMilliseconds"));
-            props.add(getStandardProperty(standardProps, "maxPoolSize"));
-            props.add(getStandardProperty(standardProps, "minPoolSize"));
-            props.add(getStandardProperty(standardProps, "readOnly"));
-            Map<String, Object> otherProps = 
value.getCustomProperties().getProperties();
-            props.add(otherProps.isEmpty() ? "" : 
JsonUtils.toJsonString(otherProps));
-            result.add(props);
-        }
-        return result;
-    }
-    
-    private String getStandardProperty(final Map<String, Object> 
standardProps, final String key) {
-        if (standardProps.containsKey(key) && null != standardProps.get(key)) {
-            return standardProps.get(key).toString();
-        }
-        return "";
-    }
-}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 32e144ab645..fe0d5bb5576 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -37,10 +37,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFacto
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -96,8 +96,6 @@ class MigrationJobAPITest {
     
     private static MigrationJobAPI jobAPI;
     
-    private static MigrationJobManager migrationJobManager;
-    
     private static PipelineJobConfigurationManager jobConfigManager;
     
     private static PipelineJobManager jobManager;
@@ -112,8 +110,7 @@ class MigrationJobAPITest {
     static void beforeClass() {
         PipelineContextUtils.mockModeConfigAndContextManager();
         jobOption = new MigrationJobOption();
-        jobAPI = new MigrationJobAPI();
-        migrationJobManager = new MigrationJobManager(jobOption);
+        jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
         jobConfigManager = new PipelineJobConfigurationManager(jobOption);
         jobManager = new PipelineJobManager(jobOption);
         transmissionJobManager = new TransmissionJobManager(jobOption);
@@ -124,13 +121,12 @@ class MigrationJobAPITest {
         props.put("jdbcUrl", jdbcUrl);
         props.put("username", "root");
         props.put("password", "root");
-        migrationJobManager.addMigrationSourceResources(
-                PipelineContextUtils.getContextKey(), 
Collections.singletonMap("ds_0", new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+        
jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(), 
Collections.singletonMap("ds_0", new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
     }
     
     @AfterAll
     static void afterClass() {
-        
migrationJobManager.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
 Collections.singletonList("ds_0"));
+        
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), 
Collections.singletonList("ds_0"));
     }
     
     @Test
@@ -258,20 +254,20 @@ class MigrationJobAPITest {
     void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
         List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", 
"t_order_1")
                 .map(each -> new SourceTargetEntry("logic_db", new 
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
-        assertThrows(PipelineInvalidParameterException.class, () -> 
migrationJobManager.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
     }
     
     @Test
     void assertCreateJobConfigFailedOnDataSourceNotExist() {
         List<SourceTargetEntry> sourceTargetEntries = 
Collections.singletonList(new SourceTargetEntry("logic_db", new 
DataNode("ds_not_exists", "t_order"), "t_order"));
-        assertThrows(PipelineInvalidParameterException.class, () -> 
migrationJobManager.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
     }
     
     @Test
     void assertCreateJobConfig() throws SQLException {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
-        String jobId = 
migrationJobManager.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
+        String jobId = jobAPI.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
         MigrationJobConfiguration actual = 
jobConfigManager.getJobConfiguration(jobId);
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -299,7 +295,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertShowMigrationSourceResources() {
-        Collection<Collection<Object>> actual = 
migrationJobManager.listMigrationSourceResources(PipelineContextUtils.getContextKey());
+        Collection<Collection<Object>> actual = 
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey());
         assertThat(actual.size(), is(1));
         Collection<Object> objects = actual.iterator().next();
         assertThat(objects.toArray()[0], is("ds_0"));

Reply via email to