This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a66b486  Separate sharding related code from scaling-core module part 
3 and refactor job configuration structure (#14045)
a66b486 is described below

commit a66b486f7cea7819516f16d779e1d6de3bf88306
Author: Hongsheng Zhong <sand...@126.com>
AuthorDate: Sat Dec 11 10:55:53 2021 +0800

    Separate sharding related code from scaling-core module part 3 and refactor 
job configuration structure (#14045)
---
 .../ShardingSphereJDBCDataSourceConfiguration.java | 12 +++
 .../typed/StandardJDBCDataSourceConfiguration.java | 30 ++++++--
 .../typed/TypedDataSourceConfiguration.java        | 18 +++++
 .../shardingsphere/infra/datanode/DataNode.java    | 19 +++++
 .../infra/datanode/DataNodeTest.java               | 14 ++++
 .../core/datasource/DataSourceManager.java         |  3 +-
 .../shardingsphere-data-pipeline-spi/pom.xml       |  2 +-
 .../migration/common/api/ScalingWorker.java        |  1 +
 .../job/preparer/AbstractDataSourcePreparer.java   | 66 ++---------------
 .../common/spi/RuleJobConfigurationPreparer.java   |  2 +-
 .../scaling/core/api/impl/ScalingAPIImpl.java      |  8 +-
 .../scaling/core/config/HandleConfiguration.java   | 28 +++++--
 .../scaling/core/config/JobConfiguration.java      | 21 +++---
 .../core/config/internal/JobDataNodeEntry.java     | 86 ++++++++++++++++++++++
 .../core/config/internal/JobDataNodeLine.java      | 74 +++++++++++++++++++
 .../scaling/core/job/JobContext.java               |  2 +-
 .../scaling/core/job/ScalingJob.java               |  2 +-
 .../consistency/DataConsistencyCheckerImpl.java    |  1 +
 .../core/job/preparer/DataSourcePreparer.java      |  6 +-
 ...arer.java => PrepareTargetTablesParameter.java} | 23 +++---
 .../core/job/preparer/ScalingJobPreparer.java      |  5 +-
 .../scaling/core/util/ScalingTaskUtil.java         |  2 +-
 .../ShardingRuleJobConfigurationPreparer.java      | 65 +++++++++-------
 .../preparer/AbstractDataSourcePreparerTest.java   |  4 +-
 .../scaling/core/api/impl/ScalingAPIImplTest.java  |  2 +-
 .../core/config/internal/JobDataNodeEntryTest.java | 44 +++++++++++
 .../core/config/internal/JobDataNodeLineTest.java  | 42 +++++++++++
 .../executor/importer/AbstractImporterTest.java    |  6 +-
 .../core/fixture/FixtureDataSourcePreparer.java    |  4 +-
 .../scaling/core/util/ResourceUtil.java            | 14 +++-
 .../component/checker/MySQLDataSourcePreparer.java | 14 ++--
 .../component/MySQLDataSourcePreparerTest.java     | 26 +++----
 .../checker/OpenGaussDataSourcePreparer.java       | 33 +++++----
 33 files changed, 495 insertions(+), 184 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java
index d339bda..54e2fb5 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java
@@ -98,6 +98,18 @@ public final class ShardingSphereJDBCDataSourceConfiguration 
extends TypedDataSo
     }
     
     /**
+     * Get actual data source configuration.
+     *
+     * @param actualDataSourceName actual data source name
+     * @return actual data source configuration
+     */
+    public StandardJDBCDataSourceConfiguration getActualDataSourceConfig(final 
String actualDataSourceName) {
+        Map<String, Object> yamlDataSourceConfig = 
rootConfig.getDataSources().get(actualDataSourceName);
+        Preconditions.checkNotNull(yamlDataSourceConfig, "actualDataSourceName 
'{}' does not exist", actualDataSourceName);
+        return new StandardJDBCDataSourceConfiguration(yamlDataSourceConfig);
+    }
+    
+    /**
      * YAML parameter configuration.
      */
     @NoArgsConstructor
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java
index 7dbf4f5..6eed671 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -53,8 +53,20 @@ public final class StandardJDBCDataSourceConfiguration 
extends TypedDataSourceCo
     
     @SuppressWarnings("unchecked")
     public StandardJDBCDataSourceConfiguration(final String parameter) {
+        this(YamlEngine.unmarshal(parameter, Map.class), parameter);
+    }
+    
+    /**
+     * Construct by YAML data source configuration.
+     *
+     * @param yamlDataSourceConfig YAML data source configuration, equivalent 
to {@link DataSourceConfiguration}
+     */
+    public StandardJDBCDataSourceConfiguration(final Map<String, Object> 
yamlDataSourceConfig) {
+        this(yamlDataSourceConfig, YamlEngine.marshal(yamlDataSourceConfig));
+    }
+    
+    private StandardJDBCDataSourceConfiguration(final Map<String, Object> 
yamlConfig, final String parameter) {
         this.parameter = parameter;
-        Map<String, Object> yamlConfig = YamlEngine.unmarshal(parameter, 
Map.class);
         if (!yamlConfig.containsKey(DATA_SOURCE_CLASS_NAME)) {
             yamlConfig.put(DATA_SOURCE_CLASS_NAME, 
"com.zaxxer.hikari.HikariDataSource");
         }
@@ -83,11 +95,13 @@ public final class StandardJDBCDataSourceConfiguration 
extends TypedDataSourceCo
         hikariConfig.setJdbcUrl(new 
JdbcUri(hikariConfig.getJdbcUrl()).appendParameters(parameters));
     }
     
-    private static String wrapParameter(final String jdbcUrl, final String 
username, final String password) {
-        Map<String, String> parameter = new HashMap<>(3, 1);
-        parameter.put("jdbcUrl", jdbcUrl);
-        parameter.put("username", username);
-        parameter.put("password", password);
-        return YamlEngine.marshal(parameter);
+    private static Map<String, Object> wrapParameter(final String jdbcUrl, 
final String username, final String password) {
+        Map<String, Object> result = new LinkedHashMap<>(3, 1);
+        result.put("jdbcUrl", jdbcUrl);
+        result.put("username", username);
+        result.put("password", password);
+        return result;
     }
+    
+    // TODO toShardingSphereJDBCDataSource(final String actualDataSourceName, 
final String logicTableName, final String actualTableName)
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java
index a1e8b2b..1473dd1 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
 import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -72,6 +73,23 @@ public abstract class TypedDataSourceConfiguration {
      */
     public abstract DatabaseType getDatabaseType();
     
+    @Override
+    public int hashCode() {
+        return Objects.hash(getType(), getParameter());
+    }
+    
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (null == obj || getClass() != obj.getClass()) {
+            return false;
+        }
+        TypedDataSourceConfiguration that = (TypedDataSourceConfiguration) obj;
+        return Objects.equals(getType(), that.getType()) && 
Objects.equals(getParameter(), that.getParameter());
+    }
+    
     /**
      * Wrap.
      *
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java
index 2916ee0..e635625 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java
@@ -46,6 +46,7 @@ public final class DataNode {
      * @param dataNode string of data node. use {@code .} to split data source 
name and table name.
      */
     public DataNode(final String dataNode) {
+        // TODO remove duplicated splitting?
         if (!isValidDataNode(dataNode)) {
             throw new ShardingSphereConfigurationException("Invalid format for 
actual data nodes: '%s'", dataNode);
         }
@@ -75,4 +76,22 @@ public final class DataNode {
     public int hashCode() {
         return Objects.hashCode(dataSourceName.toUpperCase(), 
tableName.toUpperCase());
     }
+    
+    /**
+     * Format DataNode as string.
+     *
+     * @return formatted string
+     */
+    public String format() {
+        return dataSourceName + DELIMITER + tableName;
+    }
+    
+    /**
+     * Get formatted text length.
+     *
+     * @return formatted text length
+     */
+    public int getFormattedTextLength() {
+        return dataSourceName.length() + DELIMITER.length() + 
tableName.length();
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java
index b6af666..97e2facf 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java
@@ -79,4 +79,18 @@ public final class DataNodeTest {
     public void assertEmptyTableDataNode() {
         new DataNode("ds_0.");
     }
+    
+    @Test
+    public void assertFormat() {
+        String expected = "ds_0.tbl_0";
+        DataNode dataNode = new DataNode(expected);
+        assertThat(dataNode.format(), is(expected));
+    }
+    
+    @Test
+    public void assertFormattedTextLength() {
+        String text = "ds_0.tbl_0";
+        DataNode dataNode = new DataNode(text);
+        assertThat(dataNode.getFormattedTextLength(), is(text.length()));
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
index 07e9451..e01e2cc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
@@ -22,7 +22,6 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceConfiguration;
 
-import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -71,7 +70,7 @@ public final class DataSourceManager implements AutoCloseable 
{
      * @param dataSourceConfig data source configuration
      * @return data source
      */
-    public DataSource getDataSource(final TypedDataSourceConfiguration 
dataSourceConfig) {
+    public DataSourceWrapper getDataSource(final TypedDataSourceConfiguration 
dataSourceConfig) {
         if (cachedDataSources.containsKey(dataSourceConfig)) {
             return cachedDataSources.get(dataSourceConfig);
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml
index 26e77f7..326cc6e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml
@@ -25,7 +25,7 @@
         <version>5.0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>shardingsphere-data-pipeline-api</artifactId>
+    <artifactId>shardingsphere-data-pipeline-spi</artifactId>
     <name>${project.artifactId}</name>
     
 </project>
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java
index bbd976f..692b5cb 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java
@@ -109,6 +109,7 @@ public final class ScalingWorker {
         return rootConfig.getRules().stream().filter(each -> each instanceof 
YamlShardingRuleConfiguration).map(each -> (YamlShardingRuleConfiguration) 
each).findFirst();
     }
     
+    // TODO more accurate comparison
     private boolean isShardingRulesTheSame(final YamlShardingRuleConfiguration 
sourceShardingConfig, final YamlShardingRuleConfiguration targetShardingConfig) 
{
         for (Entry<String, YamlTableRuleConfiguration> entry : 
sourceShardingConfig.getTables().entrySet()) {
             entry.getValue().setLogicTable(null);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java
index b5e1870..d42d487 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java
@@ -20,34 +20,16 @@ package 
org.apache.shardingsphere.migration.common.job.preparer;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
-import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
-import 
org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration;
-import 
org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceConfiguration;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.ActualTableDefinition;
 import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
-import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import 
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -67,48 +49,12 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
     
     private final DataSourceFactory dataSourceFactory = new 
DataSourceFactory();
     
-    protected DataSourceWrapper getSourceDataSource(final JobConfiguration 
jobConfig) {
-        return 
dataSourceFactory.newInstance(jobConfig.getRuleConfig().getSource().unwrap());
+    protected DataSourceWrapper getSourceDataSource(final RuleConfiguration 
ruleConfig) {
+        return dataSourceFactory.newInstance(ruleConfig.getSource().unwrap());
     }
     
-    protected DataSourceWrapper getTargetDataSource(final JobConfiguration 
jobConfig) {
-        return 
dataSourceFactory.newInstance(jobConfig.getRuleConfig().getTarget().unwrap());
-    }
-    
-    protected Collection<String> getLogicTableNames(final 
TypedDataSourceConfiguration sourceConfig) {
-        ShardingSphereJDBCDataSourceConfiguration source = 
(ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
-        ShardingRuleConfiguration ruleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
-        return getLogicTableNames(ruleConfig);
-    }
-    
-    private Collection<String> getLogicTableNames(final 
ShardingRuleConfiguration ruleConfig) {
-        Collection<String> result = new ArrayList<>();
-        List<String> tableNames = 
ruleConfig.getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
-        List<String> autoTableNames = 
ruleConfig.getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
-        result.addAll(tableNames);
-        result.addAll(autoTableNames);
-        return result;
-    }
-    
-    /**
-     * Get data source table names map.
-     *
-     * @param sourceConfig source data source configuration
-     * @return data source table names map. map(data source, map(first actual 
table name of logic table, logic table name)).
-     */
-    protected Map<DataSource, Map<String, String>> 
getDataSourceTableNamesMap(final TypedDataSourceConfiguration sourceConfig) {
-        ShardingSphereJDBCDataSourceConfiguration source = 
(ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
-        ShardingRuleConfiguration ruleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
-        Map<String, DataSourceConfiguration> dataSourceConfigs = new 
YamlDataSourceConfigurationSwapper().getDataSourceConfigurations(source.getRootConfig());
-        ShardingRule shardingRule = new ShardingRule(ruleConfig, 
source.getRootConfig().getDataSources().keySet());
-        Collection<String> logicTableNames = getLogicTableNames(ruleConfig);
-        Map<String, Map<String, String>> dataSourceNameTableNamesMap = new 
HashMap<>();
-        for (String each : logicTableNames) {
-            DataNode dataNode = shardingRule.getDataNode(each);
-            
dataSourceNameTableNamesMap.computeIfAbsent(dataNode.getDataSourceName(), key 
-> new LinkedHashMap<>()).put(dataNode.getTableName(), each);
-        }
-        return dataSourceNameTableNamesMap.entrySet().stream().collect(
-                Collectors.toMap(entry -> 
DataSourceConverter.getDataSource(dataSourceConfigs.get(entry.getKey())), 
Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+    protected DataSourceWrapper getTargetDataSource(final RuleConfiguration 
ruleConfig) {
+        return dataSourceFactory.newInstance(ruleConfig.getTarget().unwrap());
     }
     
     protected void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
@@ -116,7 +62,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         try (Statement statement = targetConnection.createStatement()) {
             statement.execute(sql);
         } catch (final SQLException ex) {
-            for (String ignoreMessage: IGNORE_EXCEPTION_MESSAGE) {
+            for (String ignoreMessage : IGNORE_EXCEPTION_MESSAGE) {
                 if (ex.getMessage().contains(ignoreMessage)) {
                     return;
                 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java
index 73cdb22..d661d32 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java
@@ -42,7 +42,7 @@ public interface RuleJobConfigurationPreparer extends 
TypedSPI {
      * Convert to handle configuration, used to build job configuration.
      *
      * @param ruleConfig rule configuration
-     * @return handle configuration
+     * @return handle configuration. It won't be used directly, but merge 
necessary configuration (e.g. shardingTables, logicTables) into final {@link 
HandleConfiguration}
      */
     HandleConfiguration convertToHandleConfig(RuleConfiguration ruleConfig);
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 0ba3087..59613f1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -86,7 +86,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
         JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(result.getJobId());
         JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
         result.setActive(!jobConfigPOJO.isDisabled());
-        
result.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount());
+        
result.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
         result.setTables(jobConfig.getHandleConfig().getLogicTables());
         
result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
         result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
@@ -135,7 +135,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
     @Override
     public Optional<Long> start(final JobConfiguration jobConfig) {
         jobConfig.fillInProperties();
-        if (jobConfig.getHandleConfig().getShardingTotalCount() == 0) {
+        if (jobConfig.getHandleConfig().getJobShardingCount() == 0) {
             log.warn("Invalid scaling job config!");
             throw new ScalingJobCreationException("handleConfig 
shardingTotalCount is 0");
         }
@@ -150,7 +150,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
     private String createJobConfig(final JobConfiguration jobConfig) {
         JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
         
jobConfigPOJO.setJobName(String.valueOf(jobConfig.getHandleConfig().getJobId()));
-        
jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount());
+        
jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
         jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig));
         jobConfigPOJO.getProps().setProperty("create_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
         return YamlEngine.marshal(jobConfigPOJO);
@@ -174,7 +174,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
     
     @Override
     public Map<Integer, JobProgress> getProgress(final long jobId) {
-        return IntStream.range(0, 
getJobConfig(jobId).getHandleConfig().getShardingTotalCount()).boxed()
+        return IntStream.range(0, 
getJobConfig(jobId).getHandleConfig().getJobShardingCount()).boxed()
                 .collect(LinkedHashMap::new, (map, each) -> map.put(each, 
ScalingAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, each)), 
LinkedHashMap::putAll);
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
index 9c4e874..c0a7907 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
@@ -22,6 +22,8 @@ import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 
+import java.util.List;
+
 /**
  * Handle configuration.
  */
@@ -37,16 +39,26 @@ public final class HandleConfiguration {
     
     private int retryTimes = 3;
     
-    private String[] shardingTables;
+    /**
+     * Collection of each logic table's first data node.
+     * <p>
+     * If <pre>actualDataNodes: ds_${0..1}.t_order_${0..1}</pre> and 
<pre>actualDataNodes: ds_${0..1}.t_order_item_${0..1}</pre>,
+     * then value may be: {@code ds_0.t_order_0,ds_0.t_order_item_0}.
+     * </p>
+     */
+    private String tablesFirstDataNodes;
+    
+    private List<String> jobShardingDataNodes;
     
     private String logicTables;
     
-    private int shardingItem;
+    /**
+     * Job sharding item, from {@link 
org.apache.shardingsphere.elasticjob.api.ShardingContext}.
+     */
+    private Integer jobShardingItem;
     
     private int shardingSize = 1000 * 10000;
     
-    private boolean running = true;
-    
     private String databaseType;
     
     private WorkflowConfiguration workflowConfig;
@@ -56,11 +68,11 @@ public final class HandleConfiguration {
     }
     
     /**
-     * Get sharding total count.
+     * Get job sharding count.
      *
-     * @return sharding total count
+     * @return job sharding count
      */
-    public int getShardingTotalCount() {
-        return null == shardingTables ? 0 : shardingTables.length;
+    public int getJobShardingCount() {
+        return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index 1445b0b..38505c7 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -25,7 +25,6 @@ import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.migration.common.spi.RuleJobConfigurationPreparer;
-import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
 
@@ -33,6 +32,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Scaling job configuration.
@@ -42,16 +42,9 @@ import java.util.Optional;
 @Getter
 @Setter
 @Slf4j
+// TODO share for totally new scenario
 public final class JobConfiguration {
     
-    private static final SnowflakeKeyGenerateAlgorithm 
ID_AUTO_INCREASE_GENERATOR;
-    
-    static {
-        SnowflakeKeyGenerateAlgorithm generateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
-        generateAlgorithm.init();
-        ID_AUTO_INCREASE_GENERATOR = generateAlgorithm;
-    }
-    
     static {
         
ShardingSphereServiceLoader.register(RuleJobConfigurationPreparer.class);
     }
@@ -66,13 +59,16 @@ public final class JobConfiguration {
     public void fillInProperties() {
         HandleConfiguration handleConfig = getHandleConfig();
         if (null == handleConfig.getJobId()) {
-            handleConfig.setJobId((Long) 
ID_AUTO_INCREASE_GENERATOR.generateKey());
+            handleConfig.setJobId(System.nanoTime() - 
ThreadLocalRandom.current().nextLong(100_0000));
         }
         if (Strings.isNullOrEmpty(handleConfig.getDatabaseType())) {
             
handleConfig.setDatabaseType(getRuleConfig().getSource().unwrap().getDatabaseType().getName());
         }
+        if (null == handleConfig.getJobShardingItem()) {
+            handleConfig.setJobShardingItem(0);
+        }
         RuleConfiguration ruleConfig = getRuleConfig();
-        if (null == handleConfig.getShardingTables()) {
+        if (null == handleConfig.getJobShardingDataNodes()) {
             List<HandleConfiguration> newHandleConfigs = new LinkedList<>();
             for (String each : 
ruleConfig.getChangedYamlRuleConfigClassNames()) {
                 Optional<RuleJobConfigurationPreparer> preparerOptional = 
TypedSPIRegistry.findRegisteredService(RuleJobConfigurationPreparer.class, 
each, null);
@@ -82,8 +78,9 @@ public final class JobConfiguration {
             }
             // TODO handle several rules changed or dataSources changed
             for (HandleConfiguration each : newHandleConfigs) {
-                handleConfig.setShardingTables(each.getShardingTables());
+                
handleConfig.setJobShardingDataNodes(each.getJobShardingDataNodes());
                 handleConfig.setLogicTables(each.getLogicTables());
+                
handleConfig.setTablesFirstDataNodes(each.getTablesFirstDataNodes());
             }
         }
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntry.java
new file mode 100644
index 0000000..2bb25b0
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntry.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.config.internal;
+
+import com.google.common.base.Splitter;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Job data node entry.
+ */
+@Getter
+@RequiredArgsConstructor
+@ToString
+public final class JobDataNodeEntry {
+    
+    @NonNull
+    private final String logicTableName;
+    
+    @NonNull
+    private final List<DataNode> dataNodes;
+    
+    /**
+     * Unmarshal from text.
+     *
+     * @param text marshalled entry
+     * @return entry
+     */
+    public static JobDataNodeEntry unmarshal(final String text) {
+        List<String> segments = Splitter.on(":").splitToList(text);
+        String logicTableName = segments.get(0);
+        List<DataNode> dataNodes = new LinkedList<>();
+        for (String each : 
Splitter.on(",").omitEmptyStrings().splitToList(segments.get(1))) {
+            dataNodes.add(new DataNode(each));
+        }
+        return new JobDataNodeEntry(logicTableName, dataNodes);
+    }
+    
+    /**
+     * Marshal to text.
+     *
+     * @return text, format: logicTableName:dataNode1,dataNode2, e.g. 
t_order:ds_0.t_order_0,ds_0.t_order_1
+     */
+    public String marshal() {
+        StringBuilder result = new 
StringBuilder(getMarshalledTextEstimatedLength());
+        result.append(logicTableName);
+        result.append(":");
+        for (DataNode each : dataNodes) {
+            result.append(each.format()).append(',');
+        }
+        if (!dataNodes.isEmpty()) {
+            result.setLength(result.length() - 1);
+        }
+        return result.toString();
+    }
+    
+    /**
+     * Get marshalled text estimated length.
+     *
+     * @return length
+     */
+    public int getMarshalledTextEstimatedLength() {
+        return logicTableName.length() + 1 + 
dataNodes.stream().mapToInt(DataNode::getFormattedTextLength).sum() + 
dataNodes.size();
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLine.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLine.java
new file mode 100644
index 0000000..57ed60d
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLine.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.config.internal;
+
+import com.google.common.base.Splitter;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Job data node line.
+ */
+@Getter
+@RequiredArgsConstructor
+@ToString
+public final class JobDataNodeLine {
+    
+    @NonNull
+    private final List<JobDataNodeEntry> entries;
+    
+    /**
+     * Unmarshal from text.
+     *
+     * @param text marshalled line
+     * @return line
+     */
+    public static JobDataNodeLine unmarshal(final String text) {
+        List<String> segments = 
Splitter.on('|').omitEmptyStrings().splitToList(text);
+        List<JobDataNodeEntry> entries = new ArrayList<>(segments.size());
+        for (String each : segments) {
+            entries.add(JobDataNodeEntry.unmarshal(each));
+        }
+        return new JobDataNodeLine(entries);
+    }
+    
+    /**
+     * Marshal to text.
+     *
+     * @return text, format: entry1|entry2, e.g. 
t_order:ds_0.t_order_0,ds_0.t_order_1|t_order_item:ds_0.t_order_item_0,ds_0.t_order_item_1
+     */
+    public String marshal() {
+        StringBuilder result = new 
StringBuilder(getMarshalledTextEstimatedLength());
+        for (JobDataNodeEntry each : entries) {
+            result.append(each.marshal()).append('|');
+        }
+        if (!entries.isEmpty()) {
+            result.setLength(result.length() - 1);
+        }
+        return result.toString();
+    }
+    
+    private int getMarshalledTextEstimatedLength() {
+        return 
entries.stream().mapToInt(JobDataNodeEntry::getMarshalledTextEstimatedLength).sum()
 + entries.size();
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
index 175921c..ade2f2b 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
@@ -58,7 +58,7 @@ public final class JobContext {
         this.jobConfig = jobConfig;
         jobConfig.fillInProperties();
         jobId = jobConfig.getHandleConfig().getJobId();
-        shardingItem = jobConfig.getHandleConfig().getShardingItem();
+        shardingItem = jobConfig.getHandleConfig().getJobShardingItem();
         taskConfigs = jobConfig.convertToTaskConfigs();
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index dd6fd09..e117a25 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -41,7 +41,7 @@ public final class ScalingJob implements SimpleJob {
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute scaling job {}-{}", shardingContext.getJobName(), 
shardingContext.getShardingItem());
         JobConfiguration jobConfig = 
YamlEngine.unmarshal(shardingContext.getJobParameter(), JobConfiguration.class, 
true);
-        
jobConfig.getHandleConfig().setShardingItem(shardingContext.getShardingItem());
+        
jobConfig.getHandleConfig().setJobShardingItem(shardingContext.getShardingItem());
         JobContext jobContext = new JobContext(jobConfig);
         
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
 jobContext.getShardingItem()));
         jobContext.setJobPreparer(jobPreparer);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
index 11d789a..d0c787f 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
@@ -62,6 +62,7 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
     
     private final DataSourceFactory dataSourceFactory = new 
DataSourceFactory();
     
+    // TODO replace to JobConfiguration
     private final JobContext jobContext;
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
index 94f0a81..41b9e62 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
@@ -17,8 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.job.preparer;
 
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-
 /**
  * Data source preparer.
  */
@@ -27,7 +25,7 @@ public interface DataSourcePreparer {
     /**
      * Prepare target tables.
      *
-     * @param jobConfig job configuration
+     * @param parameter prepare target tables parameter
      */
-    void prepareTargetTables(JobConfiguration jobConfig);
+    void prepareTargetTables(PrepareTargetTablesParameter parameter);
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/PrepareTargetTablesParameter.java
similarity index 64%
copy from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
copy to 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/PrepareTargetTablesParameter.java
index 94f0a81..398bea1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/PrepareTargetTablesParameter.java
@@ -17,17 +17,22 @@
 
 package org.apache.shardingsphere.scaling.core.job.preparer;
 
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine;
 
 /**
- * Data source preparer.
+ * Prepare target tables parameter.
  */
-public interface DataSourcePreparer {
+@RequiredArgsConstructor
+@Getter
+public final class PrepareTargetTablesParameter {
     
-    /**
-     * Prepare target tables.
-     *
-     * @param jobConfig job configuration
-     */
-    void prepareTargetTables(JobConfiguration jobConfig);
+    @NonNull
+    private final JobDataNodeLine tablesFirstDataNodes;
+    
+    @NonNull
+    private final RuleConfiguration ruleConfig;
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index f632f2a..93684eb 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceCo
 import 
org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 import org.apache.shardingsphere.scaling.core.job.JobStatus;
 import 
org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
@@ -74,7 +75,9 @@ public final class ScalingJobPreparer {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        dataSourcePreparer.prepareTargetTables(jobConfig);
+        JobDataNodeLine tablesFirstDataNodes = 
JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new 
PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getRuleConfig());
+        dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
     }
     
     private void initDataSourceManager(final DataSourceManager 
dataSourceManager, final List<TaskConfiguration> taskConfigs) {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
index 809b2cb..3f005d3 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
@@ -55,7 +55,7 @@ public final class ScalingTaskUtil {
     }
     
     private static boolean isProgressCompleted(final Map<Integer, JobProgress> 
jobProgressMap, final HandleConfiguration handleConfig) {
-        return handleConfig.getShardingTotalCount() == jobProgressMap.size()
+        return handleConfig.getJobShardingCount() == jobProgressMap.size()
                 && jobProgressMap.values().stream().allMatch(Objects::nonNull);
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java
index 6ec0e9b..90596a4 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.sharding.schedule;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -35,6 +36,8 @@ import 
org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeEntry;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
@@ -75,11 +78,18 @@ public final class ShardingRuleJobConfigurationPreparer 
implements RuleJobConfig
         for (Entry<String, List<DataNode>> entry : 
shouldScalingActualDataNodes.entrySet()) {
             dataNodes.addAll(entry.getValue());
         }
-        result.setShardingTables(groupByDataSource(dataNodes));
+        result.setJobShardingDataNodes(groupByDataSource(dataNodes));
         
result.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
+        
result.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
         return result;
     }
     
+    /**
+     * Get scaling actual data nodes.
+     *
+     * @param ruleConfig rule configuration
+     * @return map(logic table name, DataNode of each logic table)
+     */
     private static Map<String, List<DataNode>> 
getShouldScalingActualDataNodes(final RuleConfiguration ruleConfig) {
         TypedDataSourceConfiguration sourceConfig = 
ruleConfig.getSource().unwrap();
         Preconditions.checkState(sourceConfig instanceof 
ShardingSphereJDBCDataSourceConfiguration,
@@ -95,21 +105,28 @@ public final class ShardingRuleJobConfigurationPreparer 
implements RuleJobConfig
         return result;
     }
     
-    private static String[] groupByDataSource(final Collection<DataNode> 
dataNodes) {
+    private static List<String> groupByDataSource(final Collection<DataNode> 
dataNodes) {
         Map<String, Collection<DataNode>> dataSourceDataNodesMap = new 
LinkedHashMap<>();
         for (DataNode each : dataNodes) {
             dataSourceDataNodesMap.computeIfAbsent(each.getDataSourceName(), k 
-> new LinkedList<>()).add(each);
         }
-        return dataSourceDataNodesMap.values().stream().map(each -> 
each.stream().map(dataNode -> String.format("%s.%s", 
dataNode.getDataSourceName(), dataNode.getTableName()))
-                .collect(Collectors.joining(","))).toArray(String[]::new);
+        return dataSourceDataNodesMap.values().stream().map(each -> 
each.stream().map(DataNode::format)
+                
.collect(Collectors.joining(","))).collect(Collectors.toList());
     }
     
     private static String getLogicTables(final Set<String> logicTables) {
-        return logicTables.stream()
-                .reduce((a, b) -> String.format("%s, %s", a, b))
-                .orElse("");
+        return Joiner.on(',').join(logicTables);
     }
     
+    private static String getTablesFirstDataNodes(final Map<String, 
List<DataNode>> actualDataNodes) {
+        List<JobDataNodeEntry> dataNodeEntries = new 
ArrayList<>(actualDataNodes.size());
+        for (Entry<String, List<DataNode>> entry : actualDataNodes.entrySet()) 
{
+            dataNodeEntries.add(new JobDataNodeEntry(entry.getKey(), 
entry.getValue().subList(0, 1)));
+        }
+        return new JobDataNodeLine(dataNodeEntries).marshal();
+    }
+    
+    // TODO handle several rules changed or dataSources changed
     @Override
     public List<TaskConfiguration> convertToTaskConfigs(final JobConfiguration 
jobConfig) {
         List<TaskConfiguration> result = new LinkedList<>();
@@ -145,38 +162,36 @@ public final class ShardingRuleJobConfigurationPreparer 
implements RuleJobConfig
         return Optional.empty();
     }
     
-    private static void filterByShardingDataSourceTables(final Map<String, 
Map<String, String>> dataSourceTableNameMap, final HandleConfiguration 
handleConfig) {
-        if (null == handleConfig.getShardingTables()) {
-            log.info("shardingTables null");
+    private static void filterByShardingDataSourceTables(final Map<String, 
Map<String, String>> totalDataSourceTableNameMap, final HandleConfiguration 
handleConfig) {
+        if (null == handleConfig.getJobShardingDataNodes()) {
+            log.info("jobShardingDataNodes null");
             return;
         }
-        Map<String, Set<String>> shardingDataSourceTableMap = 
toDataSourceTableNameMap(getShardingDataSourceTables(handleConfig));
-        dataSourceTableNameMap.entrySet().removeIf(entry -> 
!shardingDataSourceTableMap.containsKey(entry.getKey()));
-        for (Entry<String, Map<String, String>> entry : 
dataSourceTableNameMap.entrySet()) {
-            filterByShardingTables(entry.getValue(), 
shardingDataSourceTableMap.get(entry.getKey()));
+        // TODO simplify data source and table name converting, and 
jobShardingDataNodes format
+        Map<String, Set<String>> jobDataSourceTableNameMap = 
toDataSourceTableNameMap(getJobShardingDataNodesEntry(handleConfig));
+        totalDataSourceTableNameMap.entrySet().removeIf(entry -> 
!jobDataSourceTableNameMap.containsKey(entry.getKey()));
+        for (Entry<String, Map<String, String>> entry : 
totalDataSourceTableNameMap.entrySet()) {
+            filterByShardingTables(entry.getValue(), 
jobDataSourceTableNameMap.get(entry.getKey()));
         }
     }
     
-    private static String getShardingDataSourceTables(final 
HandleConfiguration handleConfig) {
-        if (handleConfig.getShardingItem() >= 
handleConfig.getShardingTables().length) {
-            log.warn("shardingItem={} ge handleConfig.shardingTables.len={}", 
handleConfig.getShardingItem(), handleConfig.getShardingTables().length);
+    private static String getJobShardingDataNodesEntry(final 
HandleConfiguration handleConfig) {
+        if (handleConfig.getJobShardingItem() >= 
handleConfig.getJobShardingDataNodes().size()) {
+            log.warn("jobShardingItem={} ge 
handleConfig.jobShardingDataNodes.len={}", handleConfig.getJobShardingItem(), 
handleConfig.getJobShardingDataNodes().size());
             return "";
         }
-        return 
handleConfig.getShardingTables()[handleConfig.getShardingItem()];
+        return 
handleConfig.getJobShardingDataNodes().get(handleConfig.getJobShardingItem());
     }
     
     private static void filterByShardingTables(final Map<String, String> 
fullTables, final Set<String> shardingTables) {
         fullTables.entrySet().removeIf(entry -> 
!shardingTables.contains(entry.getKey()));
     }
     
-    private static Map<String, Set<String>> toDataSourceTableNameMap(final 
String shardingDataSourceTables) {
+    private static Map<String, Set<String>> toDataSourceTableNameMap(final 
String jobShardingDataNodesEntry) {
         Map<String, Set<String>> result = new HashMap<>();
-        for (String each : new 
InlineExpressionParser(shardingDataSourceTables).splitAndEvaluate()) {
-            String[] table = each.split("\\.");
-            if (!result.containsKey(table[0])) {
-                result.put(table[0], new HashSet<>());
-            }
-            result.get(table[0]).add(table[1]);
+        for (String each : new 
InlineExpressionParser(jobShardingDataNodesEntry).splitAndEvaluate()) {
+            DataNode dataNode = new DataNode(each);
+            result.computeIfAbsent(dataNode.getDataSourceName(), k -> new 
HashSet<>()).add(dataNode.getTableName());
         }
         return result;
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java
index 5c70ce2..528abd5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.migration.common.job.preparer;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import 
org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType;
 import org.junit.Test;
 
@@ -38,7 +38,7 @@ public final class AbstractDataSourcePreparerTest {
     
     private final AbstractDataSourcePreparer preparer = new 
AbstractDataSourcePreparer() {
         @Override
-        public void prepareTargetTables(final JobConfiguration jobConfig) {
+        public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         }
     };
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index 0505851..c7cfc85 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -75,7 +75,7 @@ public final class ScalingAPIImplTest {
         assertThat(jobInfo.getTables(), is("t_order"));
         assertThat(jobInfo.getShardingTotalCount(), is(1));
         List<Long> uncompletedJobIds = 
scalingAPI.getUncompletedJobIds("logic_db");
-        assertThat(uncompletedJobIds.size(), is(0));
+        assertTrue(uncompletedJobIds.size() > 0);
     }
     
     private Optional<JobInfo> getJobInfo(final long jobId) {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntryTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntryTest.java
new file mode 100644
index 0000000..443c772
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntryTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.config.internal;
+
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public final class JobDataNodeEntryTest {
+    
+    @Test
+    public void assertSerialization() {
+        String text = "t_order:ds_0.t_order_0,ds_0.t_order_1";
+        JobDataNodeEntry actual = JobDataNodeEntry.unmarshal(text);
+        assertNotNull(actual);
+        assertThat(actual.marshal(), is(text));
+        assertThat(actual.getLogicTableName(), is("t_order"));
+        List<DataNode> dataNodes = actual.getDataNodes();
+        assertNotNull(dataNodes);
+        assertThat(dataNodes.size(), is(2));
+        assertThat(dataNodes.get(0).format(), is("ds_0.t_order_0"));
+        assertThat(dataNodes.get(1).format(), is("ds_0.t_order_1"));
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLineTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLineTest.java
new file mode 100644
index 0000000..09acb46
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLineTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.config.internal;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public final class JobDataNodeLineTest {
+    
+    @Test
+    public void assertSerialization() {
+        String text = 
"t_order:ds_0.t_order_0,ds_0.t_order_1|t_order_item:ds_0.t_order_item_0,ds_0.t_order_item_1";
+        JobDataNodeLine actual = JobDataNodeLine.unmarshal(text);
+        assertNotNull(actual);
+        assertThat(actual.marshal(), is(text));
+        List<JobDataNodeEntry> entries = actual.getEntries();
+        assertNotNull(entries);
+        assertThat(entries.size(), is(2));
+        assertThat(entries.get(0).getLogicTableName(), is("t_order"));
+        assertThat(entries.get(1).getLogicTableName(), is("t_order_item"));
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
index 089d49d..c22331b 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.scaling.core.executor.importer;
 
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.Channel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
@@ -35,7 +36,6 @@ import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -75,8 +75,8 @@ public final class AbstractImporterTest {
     @Mock
     private Channel channel;
     
-    @Mock(extraInterfaces = AutoCloseable.class)
-    private DataSource dataSource;
+    @Mock
+    private DataSourceWrapper dataSource;
     
     @Mock
     private Connection connection;
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java
index 0230e98..7c3153a 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java
@@ -17,12 +17,12 @@
 
 package org.apache.shardingsphere.scaling.core.fixture;
 
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+import 
org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter;
 
 public final class FixtureDataSourcePreparer implements DataSourcePreparer {
     
     @Override
-    public void prepareTargetTables(final JobConfiguration jobConfig) {
+    public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java
index feb585d..2ec25d0 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java
@@ -22,8 +22,10 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.typed.StandardJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -73,12 +75,20 @@ public final class ResourceUtil {
      * @return standard JDBC as target job configuration
      */
     public static JobConfiguration mockStandardJdbcTargetJobConfig() {
+        JobConfiguration result = new JobConfiguration();
         RuleConfiguration ruleConfig = new RuleConfiguration();
+        result.setRuleConfig(ruleConfig);
         setupChangedYamlRuleConfigClassNames(ruleConfig);
         ruleConfig.setSource(new 
ShardingSphereJDBCDataSourceConfiguration(readFileToString("/config_sharding_sphere_jdbc_source.yaml")).wrap());
         ruleConfig.setTarget(new 
StandardJDBCDataSourceConfiguration(readFileToString("/config_standard_jdbc_target.yaml")).wrap());
-        JobConfiguration result = new JobConfiguration();
-        result.setRuleConfig(ruleConfig);
+        HandleConfiguration handleConfig = new HandleConfiguration();
+        result.setHandleConfig(handleConfig);
+        handleConfig.setJobShardingItem(0);
+        handleConfig.setLogicTables("t_order");
+        handleConfig.setTablesFirstDataNodes("t_order:ds_0.t_order");
+        
handleConfig.setJobShardingDataNodes(Collections.singletonList("ds_0.t_order"));
+        handleConfig.setWorkflowConfig(new WorkflowConfiguration("logic_db", 
"id1"));
+        result.fillInProperties();
         return result;
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
index 209c258..0746253 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
@@ -21,7 +21,9 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
 import 
org.apache.shardingsphere.migration.common.job.preparer.AbstractDataSourcePreparer;
 import 
org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeEntry;
+import 
org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
 
 import java.sql.Connection;
@@ -30,6 +32,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.stream.Collectors;
 
 /**
  * Data source preparer for MySQL.
@@ -40,12 +43,13 @@ public final class MySQLDataSourcePreparer extends 
AbstractDataSourcePreparer {
     private final MySQLScalingSQLBuilder scalingSQLBuilder = new 
MySQLScalingSQLBuilder(Collections.emptyMap());
     
     @Override
-    public void prepareTargetTables(final JobConfiguration jobConfig) {
-        try (DataSourceWrapper sourceDataSource = 
getSourceDataSource(jobConfig);
+    public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
+        RuleConfiguration ruleConfig = parameter.getRuleConfig();
+        try (DataSourceWrapper sourceDataSource = 
getSourceDataSource(ruleConfig);
              Connection sourceConnection = sourceDataSource.getConnection();
-             DataSourceWrapper targetDataSource = 
getTargetDataSource(jobConfig);
+             DataSourceWrapper targetDataSource = 
getTargetDataSource(ruleConfig);
              Connection targetConnection = targetDataSource.getConnection()) {
-            Collection<String> logicTableNames = 
getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap());
+            Collection<String> logicTableNames = 
parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
             for (String each : logicTableNames) {
                 String createTableSQL = getCreateTableSQL(sourceConnection, 
each);
                 createTableSQL = 
addIfNotExistsForCreateTableSQL(createTableSQL);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java
index cfd9b5b..40080e8 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java
@@ -19,12 +19,11 @@ package org.apache.shardingsphere.scaling.mysql.component;
 
 import 
org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceConfigurationWrap;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine;
+import 
org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.scaling.mysql.component.checker.MySQLDataSourcePreparer;
-import 
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,7 +41,7 @@ import static org.mockito.Mockito.when;
 public final class MySQLDataSourcePreparerTest {
 
     @Mock
-    private JobConfiguration jobConfiguration;
+    private PrepareTargetTablesParameter prepareTargetTablesParameter;
 
     @Mock
     private RuleConfiguration ruleConfiguration;
@@ -65,37 +64,30 @@ public final class MySQLDataSourcePreparerTest {
     @Mock(extraInterfaces = AutoCloseable.class)
     private DataSource targetDataSource;
 
-    @Mock
-    private YamlRootConfiguration yamlRootConfiguration;
-
-    @Mock
-    private YamlShardingRuleConfiguration yamlShardingRuleConfiguration;
-
     @Before
     public void setUp() throws SQLException {
-        when(jobConfiguration.getRuleConfig()).thenReturn(ruleConfiguration);
+        
when(prepareTargetTablesParameter.getRuleConfig()).thenReturn(ruleConfiguration);
+        
when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new 
JobDataNodeLine(Collections.emptyList()));
         
when(ruleConfiguration.getSource()).thenReturn(sourceDataSourceConfigurationWrap);
         
when(sourceDataSourceConfigurationWrap.unwrap()).thenReturn(sourceScalingDataSourceConfiguration);
         
when(sourceScalingDataSourceConfiguration.toDataSource()).thenReturn(sourceDataSource);
-        
when(sourceScalingDataSourceConfiguration.getRootConfig()).thenReturn(yamlRootConfiguration);
-        
when(yamlRootConfiguration.getRules()).thenReturn(Collections.singletonList(yamlShardingRuleConfiguration));
         
when(ruleConfiguration.getTarget()).thenReturn(targetDataSourceConfigurationWrap);
         
when(targetDataSourceConfigurationWrap.unwrap()).thenReturn(targetScalingDataSourceConfiguration);
         
when(targetScalingDataSourceConfiguration.toDataSource()).thenReturn(targetDataSource);
     }
-
+    
     @Test
     public void assertGetConnection() throws SQLException {
         MySQLDataSourcePreparer mySQLDataSourcePreparer = new 
MySQLDataSourcePreparer();
-        mySQLDataSourcePreparer.prepareTargetTables(jobConfiguration);
+        
mySQLDataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
         verify(sourceDataSource).getConnection();
         verify(targetDataSource).getConnection();
     }
-
+    
     @Test(expected = PrepareFailedException.class)
     public void assertThrowPrepareFailedException() throws SQLException {
         when(sourceDataSource.getConnection()).thenThrow(SQLException.class);
         MySQLDataSourcePreparer mySQLDataSourcePreparer = new 
MySQLDataSourcePreparer();
-        mySQLDataSourcePreparer.prepareTargetTables(jobConfiguration);
+        
mySQLDataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
index 630ee3e..12d277c 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
@@ -18,14 +18,17 @@
 package org.apache.shardingsphere.scaling.opengauss.component.checker;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
+import 
org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.migration.common.job.preparer.AbstractDataSourcePreparer;
 import 
org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeEntry;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.ActualTableDefinition;
+import 
org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -48,15 +51,15 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
     private static final String WITH_OF_TABLE_EXTEND = "with (";
     
     @Override
-    public void prepareTargetTables(final JobConfiguration jobConfig) {
+    public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         Collection<ActualTableDefinition> actualTableDefinitions;
         try {
-            actualTableDefinitions = getActualTableDefinitions(jobConfig);
+            actualTableDefinitions = getActualTableDefinitions(parameter);
         } catch (final SQLException ex) {
             throw new PrepareFailedException("get table definitions failed.", 
ex);
         }
         Map<String, Collection<String>> createLogicTableSQLs = 
getCreateLogicTableSQLs(actualTableDefinitions);
-        try (DataSourceWrapper targetDataSource = 
getTargetDataSource(jobConfig);
+        try (DataSourceWrapper targetDataSource = 
getTargetDataSource(parameter.getRuleConfig());
              Connection targetConnection = targetDataSource.getConnection()) {
             for (Entry<String, Collection<String>> entry : 
createLogicTableSQLs.entrySet()) {
                 for (String each : entry.getValue()) {
@@ -69,17 +72,19 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
         }
     }
     
-    private Collection<ActualTableDefinition> getActualTableDefinitions(final 
JobConfiguration jobConfig) throws SQLException {
+    private Collection<ActualTableDefinition> getActualTableDefinitions(final 
PrepareTargetTablesParameter parameter) throws SQLException {
         Collection<ActualTableDefinition> result = new ArrayList<>();
-        Map<DataSource, Map<String, String>> dataSourceTableNamesMap = 
getDataSourceTableNamesMap(jobConfig.getRuleConfig().getSource().unwrap());
-        for (Entry<DataSource, Map<String, String>> entry : 
dataSourceTableNamesMap.entrySet()) {
-            try (DataSourceWrapper dataSource = new 
DataSourceWrapper(entry.getKey());
-                 Connection sourceConnection = dataSource.getConnection()) {
-                for (Entry<String, String> tableNameEntry : 
entry.getValue().entrySet()) {
-                    String actualTableName = tableNameEntry.getKey();
+        ShardingSphereJDBCDataSourceConfiguration sourceConfig = 
(ShardingSphereJDBCDataSourceConfiguration) 
parameter.getRuleConfig().getSource().unwrap();
+        try (DataSourceManager dataSourceManager = new DataSourceManager()) {
+            for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
+                DataNode dataNode = each.getDataNodes().get(0);
+                // Keep dataSource to reuse
+                DataSourceWrapper dataSource = 
dataSourceManager.getDataSource(sourceConfig.getActualDataSourceConfig(dataNode.getDataSourceName()));
+                try (Connection sourceConnection = dataSource.getConnection()) 
{
+                    String actualTableName = dataNode.getTableName();
                     int oid = queryTableOid(sourceConnection, actualTableName);
                     String tableDefinition = 
queryTableDefinition(sourceConnection, oid);
-                    String logicTableName = tableNameEntry.getValue();
+                    String logicTableName = each.getLogicTableName();
                     result.add(new ActualTableDefinition(logicTableName, 
actualTableName, tableDefinition));
                 }
             }
@@ -158,7 +163,7 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
         String[] search = {WITH_OF_TABLE_EXTEND, ")"};
         List<Integer> searchPos = new ArrayList<>(2);
         int startPos = 0;
-        for (String each: search) {
+        for (String each : search) {
             int curSearch = lowerCreateSQL.indexOf(each, startPos);
             if (curSearch <= 0) {
                 break;

Reply via email to