This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f4b22c3  Overwrite InventoryDataTaskSplitter.splitByPrimaryKeyRange 
(#8422)
f4b22c3 is described below

commit f4b22c3d2987b8cca86778ca32b98c6091d1f1e9
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Tue Dec 1 11:39:48 2020 +0800

    Overwrite InventoryDataTaskSplitter.splitByPrimaryKeyRange (#8422)
    
    * Overwrite InventoryDataTaskSplitter.splitByPrimaryKeyRange()
    
    * Rename SQLBuilder to ScalingSQLBuilder
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/fixture/FixtureH2ScalingEntry.java     |  6 ++
 .../core/config/InventoryDumperConfiguration.java  |  2 +-
 .../scaling/core/config/JobConfiguration.java      |  2 +
 .../scaling/core/config/SyncConfiguration.java     |  2 +-
 .../executor/importer/AbstractJDBCImporter.java    | 13 ++--
 .../AbstractScalingSQLBuilder.java}                | 39 ++++--------
 .../executor/sqlbuilder/ScalingSQLBuilder.java     | 72 ++++++++++++++++++++++
 .../sqlbuilder/ScalingSQLBuilderFactory.java       | 43 +++++++++++++
 .../job/check/AbstractDataConsistencyChecker.java  |  4 +-
 .../job/preparer/ShardingScalingJobPreparer.java   |  8 +--
 .../job/preparer/resumer/SyncPositionResumer.java  |  4 +-
 .../splitter/InventoryDataTaskSplitter.java        | 54 ++++++++--------
 .../task/inventory/InventoryDataScalingTask.java   |  2 +-
 .../scaling/core/spi/ScalingEntry.java             |  8 +++
 .../scaling/core/utils/SyncConfigurationUtil.java  |  2 +-
 .../importer/AbstractJDBCImporterTest.java         | 15 ++---
 .../executor/importer/AbstractSqlBuilderTest.java  | 31 +++-------
 .../fixture/FixtureDataConsistencyChecker.java     | 16 +----
 .../core/fixture/FixtureH2ScalingEntry.java        |  6 ++
 .../core/fixture/FixtureScalingSQLBuilder.java}    | 34 +++++-----
 .../splitter/InventoryDataTaskSplitterTest.java    | 36 +++++++----
 .../inventory/InventoryDataScalingTaskTest.java    |  3 +-
 .../scaling/mysql/MySQLScalingEntry.java           |  7 +++
 .../component/MySQLDataConsistencyChecker.java     |  4 +-
 .../scaling/mysql/component/MySQLImporter.java     |  6 +-
 ...SQLBuilder.java => MySQLScalingSQLBuilder.java} |  7 ++-
 ...erTest.java => MySQLScalingSQLBuilderTest.java} |  4 +-
 .../scaling/postgresql/PostgreSQLScalingEntry.java |  7 +++
 .../PostgreSQLDataConsistencyChecker.java          |  6 +-
 .../postgresql/component/PostgreSQLImporter.java   |  6 +-
 ...ilder.java => PostgreSQLScalingSQLBuilder.java} |  7 ++-
 ...t.java => PostgreSQLScalingSQLBuilderTest.java} |  4 +-
 32 files changed, 295 insertions(+), 165 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index 9996ecb..bd86ae5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.fixture;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -58,6 +59,11 @@ public final class FixtureH2ScalingEntry implements 
ScalingEntry {
     }
     
     @Override
+    public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+        return null;
+    }
+    
+    @Override
     public String getDatabaseType() {
         return "H2";
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
index d004c52..839bcd0 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
@@ -31,7 +31,7 @@ public final class InventoryDumperConfiguration extends 
DumperConfiguration {
     
     private String primaryKey;
     
-    private Integer spiltNum;
+    private Integer shardingItem;
     
     public InventoryDumperConfiguration(final DumperConfiguration 
dumperConfig) {
         setDataSourceName(dumperConfig.getDataSourceName());
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index 8fb2f35..cb6bc0d 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -37,5 +37,7 @@ public final class JobConfiguration {
     
     private int shardingItem;
     
+    private int shardingSize = 10000000;
+    
     private boolean running = true;
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
index abffdcc..566c29b 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
@@ -27,7 +27,7 @@ import lombok.RequiredArgsConstructor;
 @RequiredArgsConstructor
 public final class SyncConfiguration {
     
-    private final int concurrency;
+    private final JobConfiguration jobConfig;
     
     private final DumperConfiguration dumperConfig;
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index ce0f5c5..9f21632 100755
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRe
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -54,7 +55,7 @@ public abstract class AbstractJDBCImporter extends 
AbstractShardingScalingExecut
     
     private final DataSourceManager dataSourceManager;
     
-    private final AbstractSQLBuilder sqlBuilder;
+    private final ScalingSQLBuilder scalingSqlBuilder;
     
     @Setter
     private Channel channel;
@@ -62,7 +63,7 @@ public abstract class AbstractJDBCImporter extends 
AbstractShardingScalingExecut
     protected AbstractJDBCImporter(final ImporterConfiguration importerConfig, 
final DataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
-        sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
+        scalingSqlBuilder = 
createSQLBuilder(importerConfig.getShardingColumnsMap());
     }
     
     /**
@@ -71,7 +72,7 @@ public abstract class AbstractJDBCImporter extends 
AbstractShardingScalingExecut
      * @param shardingColumnsMap sharding columns map
      * @return SQL builder
      */
-    protected abstract AbstractSQLBuilder createSQLBuilder(Map<String, 
Set<String>> shardingColumnsMap);
+    protected abstract ScalingSQLBuilder createSQLBuilder(Map<String, 
Set<String>> shardingColumnsMap);
     
     @Override
     public final void start() {
@@ -153,7 +154,7 @@ public abstract class AbstractJDBCImporter extends 
AbstractShardingScalingExecut
     }
     
     private void executeBatchInsert(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
-        String insertSql = sqlBuilder.buildInsertSQL(dataRecords.get(0));
+        String insertSql = 
scalingSqlBuilder.buildInsertSQL(dataRecords.get(0));
         PreparedStatement ps = connection.prepareStatement(insertSql);
         ps.setQueryTimeout(30);
         for (DataRecord each : dataRecords) {
@@ -174,7 +175,7 @@ public abstract class AbstractJDBCImporter extends 
AbstractShardingScalingExecut
     private void executeUpdate(final Connection connection, final DataRecord 
record) throws SQLException {
         List<Column> conditionColumns = 
RecordUtil.extractConditionColumns(record, 
importerConfig.getShardingColumnsMap().get(record.getTableName()));
         List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
-        String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
+        String updateSql = scalingSqlBuilder.buildUpdateSQL(record, 
conditionColumns);
         PreparedStatement ps = connection.prepareStatement(updateSql);
         for (int i = 0; i < updatedColumns.size(); i++) {
             ps.setObject(i + 1, updatedColumns.get(i).getValue());
@@ -190,7 +191,7 @@ public abstract class AbstractJDBCImporter extends 
AbstractShardingScalingExecut
     
     private void executeBatchDelete(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         List<Column> conditionColumns = 
RecordUtil.extractConditionColumns(dataRecords.get(0), 
importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
-        String deleteSQL = sqlBuilder.buildDeleteSQL(dataRecords.get(0), 
conditionColumns);
+        String deleteSQL = 
scalingSqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
         PreparedStatement ps = connection.prepareStatement(deleteSQL);
         ps.setQueryTimeout(30);
         for (DataRecord each : dataRecords) {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
similarity index 90%
rename from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
index d40e47a..6f9edb742 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+package org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder;
 
 import com.google.common.collect.Collections2;
 import lombok.AccessLevel;
@@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentMap;
  * Abstract SQL builder.
  */
 @RequiredArgsConstructor
-public abstract class AbstractSQLBuilder {
+public abstract class AbstractScalingSQLBuilder implements ScalingSQLBuilder {
     
     private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
     
@@ -72,12 +72,7 @@ public abstract class AbstractSQLBuilder {
         return new 
StringBuilder().append(getLeftIdentifierQuoteString()).append(item).append(getRightIdentifierQuoteString());
     }
     
-    /**
-     * Build insert SQL.
-     *
-     * @param dataRecord data record
-     * @return insert SQL
-     */
+    @Override
     public String buildInsertSQL(final DataRecord dataRecord) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -98,13 +93,7 @@ public abstract class AbstractSQLBuilder {
         return String.format("INSERT INTO %s(%s) VALUES(%s)", 
quote(tableName), columnsLiteral, holder);
     }
     
-    /**
-     * Build update SQL.
-     *
-     * @param dataRecord data record
-     * @param conditionColumns condition columns
-     * @return update SQL
-     */
+    @Override
     public String buildUpdateSQL(final DataRecord dataRecord, final 
Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -126,13 +115,7 @@ public abstract class AbstractSQLBuilder {
         return Collections2.filter(columns, Column::isUpdated);
     }
     
-    /**
-     * Build delete SQL.
-     *
-     * @param dataRecord data record
-     * @param conditionColumns condition columns
-     * @return delete SQL
-     */
+    @Override
     public String buildDeleteSQL(final DataRecord dataRecord, final 
Collection<Column> conditionColumns) {
         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -154,13 +137,13 @@ public abstract class AbstractSQLBuilder {
         return where.toString();
     }
     
-    /**
-     * Build count SQL.
-     *
-     * @param tableName table name
-     * @return count SQL
-     */
+    @Override
     public String buildCountSQL(final String tableName) {
         return String.format("SELECT COUNT(*) FROM %s", quote(tableName));
     }
+    
+    @Override
+    public String buildSplitByPrimaryKeyRangeSQL(final String tableName, final 
String primaryKey) {
+        return String.format("SELECT MAX(%s) FROM (SELECT %s FROM %s WHERE 
%s>=? limit ?) t", quote(primaryKey), quote(primaryKey), quote(tableName), 
quote(primaryKey));
+    }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
new file mode 100644
index 0000000..09c98fa
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder;
+
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+
+import java.util.Collection;
+
+/**
+ * Scaling SQL builder.
+ */
+public interface ScalingSQLBuilder {
+    
+    /**
+     * Build insert SQL.
+     *
+     * @param dataRecord data record
+     * @return insert SQL
+     */
+    String buildInsertSQL(DataRecord dataRecord);
+    
+    /**
+     * Build update SQL.
+     *
+     * @param dataRecord data record
+     * @param conditionColumns condition columns
+     * @return update SQL
+     */
+    String buildUpdateSQL(DataRecord dataRecord, Collection<Column> 
conditionColumns);
+    
+    /**
+     * Build delete SQL.
+     *
+     * @param dataRecord data record
+     * @param conditionColumns condition columns
+     * @return delete SQL
+     */
+    String buildDeleteSQL(DataRecord dataRecord, Collection<Column> 
conditionColumns);
+    
+    /**
+     * Build count SQL.
+     *
+     * @param tableName table name
+     * @return count SQL
+     */
+    String buildCountSQL(String tableName);
+    
+    /**
+     * Build split by primary key range SQL.
+     *
+     * @param tableName table name
+     * @param primaryKey primary key
+     * @return split SQL
+     */
+    String buildSplitByPrimaryKeyRangeSQL(String tableName, String primaryKey);
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilderFactory.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilderFactory.java
new file mode 100644
index 0000000..40b651c
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder;
+
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
+
+import java.util.Map;
+
+/**
+ * Scaling SQL builder factory.
+ */
+public final class ScalingSQLBuilderFactory {
+    
+    /**
+     * New instance of SQL builder.
+     *
+     * @param databaseType database type
+     * @return SQL builder
+     */
+    @SneakyThrows(ReflectiveOperationException.class)
+    public static ScalingSQLBuilder newInstance(final String databaseType) {
+        ScalingEntry scalingEntry = 
ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
+        return 
scalingEntry.getSQLBuilderClass().getConstructor(Map.class).newInstance(Maps.newHashMap());
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
index a5efe04..49f8cb9 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
 import org.apache.shardingsphere.scaling.core.exception.DataCheckFailException;
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
 
 import javax.sql.DataSource;
@@ -85,5 +85,5 @@ public abstract class AbstractDataConsistencyChecker 
implements DataConsistencyC
         return 
dataSourceFactory.newInstance(shardingScalingJob.getScalingConfig().getRuleConfiguration().getTarget().unwrap());
     }
     
-    protected abstract AbstractSQLBuilder getSqlBuilder();
+    protected abstract ScalingSQLBuilder getSqlBuilder();
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 50dcfff..7f3fb4c 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -68,7 +68,7 @@ public final class ShardingScalingJobPreparer {
                 syncPositionResumer.resumePosition(shardingScalingJob, 
dataSourceManager, resumeBreakPointManager);
             } else {
                 initIncrementalDataTasks(databaseType, shardingScalingJob, 
dataSourceManager);
-                initInventoryDataTasks(shardingScalingJob, dataSourceManager);
+                initInventoryDataTasks(databaseType, shardingScalingJob, 
dataSourceManager);
                 syncPositionResumer.persistPosition(shardingScalingJob, 
resumeBreakPointManager);
             }
             
shardingScalingJob.setDataConsistencyChecker(initDataConsistencyChecker(databaseType,
 shardingScalingJob));
@@ -90,10 +90,10 @@ public final class ShardingScalingJobPreparer {
         
dataSourceChecker.checkVariable(dataSourceManager.getSourceDataSources().values());
     }
     
-    private void initInventoryDataTasks(final ShardingScalingJob 
shardingScalingJob, final DataSourceManager dataSourceManager) {
+    private void initInventoryDataTasks(final String databaseType, final 
ShardingScalingJob shardingScalingJob, final DataSourceManager 
dataSourceManager) {
         List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
-            
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(each, 
dataSourceManager));
+            
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(databaseType,
 each, dataSourceManager));
         }
         
shardingScalingJob.getInventoryDataTasks().addAll(allInventoryDataTasks);
     }
@@ -102,7 +102,7 @@ public final class ShardingScalingJobPreparer {
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
             DataSourceConfiguration dataSourceConfig = 
each.getDumperConfig().getDataSourceConfig();
             
each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(databaseType,
 dataSourceManager.getDataSource(dataSourceConfig)));
-            
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(),
 each.getDumperConfig(), each.getImporterConfig()));
+            
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getJobConfig().getConcurrency(),
 each.getDumperConfig(), each.getImporterConfig()));
         }
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
index a9e6d51..f64ce7a 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -78,7 +78,7 @@ public final class SyncPositionResumer {
         result.setTableName(splitTable[0].split("\\.")[1]);
         result.setPositionManager(entry.getValue());
         if (2 == splitTable.length) {
-            result.setSpiltNum(Integer.parseInt(splitTable[1]));
+            result.setShardingItem(Integer.parseInt(splitTable[1]));
         }
         
result.setPrimaryKey(metaDataManager.getTableMetaData(result.getTableName()).getPrimaryKeyColumns().get(0));
         return result;
@@ -94,7 +94,7 @@ public final class SyncPositionResumer {
     private void resumeIncrementalPosition(final ShardingScalingJob 
shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
             
each.getDumperConfig().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfig().getDataSourceName()));
-            
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(),
 each.getDumperConfig(), each.getImporterConfig()));
+            
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getJobConfig().getConcurrency(),
 each.getDumperConfig(), each.getImporterConfig()));
         }
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index b522cc6..3b66c38 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -24,6 +24,8 @@ import 
org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguratio
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilderFactory;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
@@ -53,25 +55,27 @@ public final class InventoryDataTaskSplitter {
     /**
      * Split inventory data to multi-tasks.
      *
+     * @param databaseType database type
      * @param syncConfig synchronize configuration
      * @param dataSourceManager data source manager
      * @return split inventory data task
      */
-    public Collection<ScalingTask> splitInventoryData(final SyncConfiguration 
syncConfig, final DataSourceManager dataSourceManager) {
+    public Collection<ScalingTask> splitInventoryData(final String 
databaseType, final SyncConfiguration syncConfig, final DataSourceManager 
dataSourceManager) {
         Collection<ScalingTask> result = new LinkedList<>();
-        for (InventoryDumperConfiguration each : 
splitDumperConfig(syncConfig.getConcurrency(), syncConfig.getDumperConfig(), 
dataSourceManager)) {
+        for (InventoryDumperConfiguration each : 
splitDumperConfig(databaseType, syncConfig.getJobConfig().getShardingSize(), 
syncConfig.getDumperConfig(), dataSourceManager)) {
             result.add(syncTaskFactory.createInventoryDataSyncTask(each, 
syncConfig.getImporterConfig()));
         }
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitDumperConfig(final 
int concurrency, final DumperConfiguration dumperConfig, final 
DataSourceManager dataSourceManager) {
+    private Collection<InventoryDumperConfiguration> splitDumperConfig(
+            final String databaseType, final int shardingSize, final 
DumperConfiguration dumperConfig, final DataSourceManager dataSourceManager) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         MetaDataManager metaDataManager = new MetaDataManager(dataSource);
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
             if (isSpiltByPrimaryKeyRange(each, metaDataManager)) {
-                result.addAll(splitByPrimaryKeyRange(concurrency, each, 
metaDataManager, dataSource));
+                result.addAll(splitByPrimaryKeyRange(databaseType, 
shardingSize, each, metaDataManager, dataSource));
             } else {
                 result.add(each);
             }
@@ -117,31 +121,33 @@ public final class InventoryDataTaskSplitter {
         return Types.INTEGER != columnType && Types.BIGINT != columnType && 
Types.SMALLINT != columnType && Types.TINYINT != columnType;
     }
     
-    private Collection<InventoryDumperConfiguration> 
splitByPrimaryKeyRange(final int concurrency, final 
InventoryDumperConfiguration inventoryDumperConfig,
-                                                                            
final MetaDataManager metaDataManager, final DataSource dataSource) {
+    private Collection<InventoryDumperConfiguration> splitByPrimaryKeyRange(
+            final String databaseType, final int shardingSize, final 
InventoryDumperConfiguration inventoryDumperConfig, final MetaDataManager 
metaDataManager, final DataSource dataSource) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         String tableName = inventoryDumperConfig.getTableName();
         String primaryKey = 
metaDataManager.getTableMetaData(tableName).getPrimaryKeyColumns().get(0);
+        ScalingSQLBuilder scalingSqlBuilder = 
ScalingSQLBuilderFactory.newInstance(databaseType);
         inventoryDumperConfig.setPrimaryKey(primaryKey);
-        try (Connection connection = dataSource.getConnection()) {
-            PreparedStatement ps = 
connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 
1", primaryKey, primaryKey, inventoryDumperConfig.getTableName()));
-            ResultSet rs = ps.executeQuery();
-            rs.next();
-            long min = rs.getLong(1);
-            long max = rs.getLong(2);
-            long step = (max - min) / concurrency;
-            for (int i = 0; i < concurrency && min <= max; i++) {
-                InventoryDumperConfiguration splitDumperConfig = new 
InventoryDumperConfiguration(inventoryDumperConfig);
-                if (i < concurrency - 1) {
-                    splitDumperConfig.setPositionManager(new 
PositionManager(new PrimaryKeyPosition(min, min + step)));
-                    min += step + 1;
-                } else {
-                    splitDumperConfig.setPositionManager(new 
PositionManager(new PrimaryKeyPosition(min, max)));
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement ps = 
connection.prepareStatement(scalingSqlBuilder.buildSplitByPrimaryKeyRangeSQL(tableName,
 primaryKey))) {
+            long beginId = 0;
+            for (int i = 0; i < Integer.MAX_VALUE; i++) {
+                ps.setLong(1, beginId);
+                ps.setLong(2, shardingSize);
+                try (ResultSet rs = ps.executeQuery()) {
+                    rs.next();
+                    long endId = rs.getLong(1);
+                    if (endId == 0) {
+                        break;
+                    }
+                    InventoryDumperConfiguration splitDumperConfig = new 
InventoryDumperConfiguration(inventoryDumperConfig);
+                    splitDumperConfig.setPositionManager(new 
PositionManager(new PrimaryKeyPosition(beginId, endId)));
+                    splitDumperConfig.setShardingItem(i);
+                    splitDumperConfig.setPrimaryKey(primaryKey);
+                    splitDumperConfig.setTableName(tableName);
+                    result.add(splitDumperConfig);
+                    beginId = endId + 1;
                 }
-                splitDumperConfig.setSpiltNum(i);
-                splitDumperConfig.setPrimaryKey(primaryKey);
-                splitDumperConfig.setTableName(tableName);
-                result.add(splitDumperConfig);
             }
         } catch (final SQLException ex) {
             throw new PrepareFailedException(String.format("Split task for 
table %s by primary key %s error", inventoryDumperConfig.getTableName(), 
primaryKey), ex);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 885b4c6..ca8cac7 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -68,7 +68,7 @@ public final class InventoryDataScalingTask extends 
AbstractShardingScalingExecu
     
     private String generateSyncTaskId(final InventoryDumperConfiguration 
inventoryDumperConfig) {
         String result = String.format("%s.%s", 
inventoryDumperConfig.getDataSourceName(), 
inventoryDumperConfig.getTableName());
-        return null == inventoryDumperConfig.getSpiltNum() ? result : result + 
"#" + inventoryDumperConfig.getSpiltNum();
+        return null == inventoryDumperConfig.getShardingItem() ? result : 
result + "#" + inventoryDumperConfig.getShardingItem();
     }
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index c765215..692504e 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.infra.database.type.DatabaseTypeAwareSPI;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -71,4 +72,11 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
      * @return data consistency checker type
      */
     Class<? extends DataConsistencyChecker> getDataConsistencyCheckerClass();
+    
+    /**
+     * Get SQL builder class.
+     *
+     * @return SQL builder type
+     */
+    Class<? extends ScalingSQLBuilder> getSQLBuilderClass();
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
index 6d00841..76f74ee 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
@@ -83,7 +83,7 @@ public final class SyncConfigurationUtil {
         for (Entry<String, Map<String, String>> entry : 
dataSourceTableNameMap.entrySet()) {
             DumperConfiguration dumperConfig = 
createDumperConfig(entry.getKey(), 
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
             ImporterConfiguration importerConfig = 
createImporterConfig(scalingConfig, shardingColumnsMap);
-            result.add(new 
SyncConfiguration(scalingConfig.getJobConfiguration().getConcurrency(), 
dumperConfig, importerConfig));
+            result.add(new 
SyncConfiguration(scalingConfig.getJobConfiguration(), dumperConfig, 
importerConfig));
         }
         return result;
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index fc25ab4..4d19ce5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,7 +67,7 @@ public final class AbstractJDBCImporterTest {
     private DataSourceManager dataSourceManager;
     
     @Mock
-    private AbstractSQLBuilder sqlBuilder;
+    private ScalingSQLBuilder scalingSqlBuilder;
     
     @Mock
     private DataSourceConfiguration dataSourceConfig;
@@ -90,8 +91,8 @@ public final class AbstractJDBCImporterTest {
         jdbcImporter = new AbstractJDBCImporter(mockImporterConfiguration(), 
dataSourceManager) {
             
             @Override
-            protected AbstractSQLBuilder createSQLBuilder(final Map<String, 
Set<String>> shardingColumnsMap) {
-                return sqlBuilder;
+            protected ScalingSQLBuilder createSQLBuilder(final Map<String, 
Set<String>> shardingColumnsMap) {
+                return scalingSqlBuilder;
             }
         };
         jdbcImporter.setChannel(channel);
@@ -102,7 +103,7 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertWriteInsertDataRecord() throws SQLException {
         DataRecord insertRecord = getDataRecord("INSERT");
-        when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
+        
when(scalingSqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
         
when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), 
anyInt())).thenReturn(mockRecords(insertRecord));
         jdbcImporter.run();
@@ -115,7 +116,7 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = getDataRecord("DELETE");
-        when(sqlBuilder.buildDeleteSQL(deleteRecord, 
mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
+        when(scalingSqlBuilder.buildDeleteSQL(deleteRecord, 
mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
         
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), 
anyInt())).thenReturn(mockRecords(deleteRecord));
         jdbcImporter.run();
@@ -127,7 +128,7 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = getDataRecord("UPDATE");
-        when(sqlBuilder.buildUpdateSQL(updateRecord, 
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+        when(scalingSqlBuilder.buildUpdateSQL(updateRecord, 
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), 
anyInt())).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
@@ -141,7 +142,7 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertUpdatePrimaryKeyDataRecord() throws SQLException {
         DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
-        when(sqlBuilder.buildUpdateSQL(updateRecord, 
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+        when(scalingSqlBuilder.buildUpdateSQL(updateRecord, 
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), 
anyInt())).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index bb86ec7..43d1b33 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Sets;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
+import org.apache.shardingsphere.scaling.core.fixture.FixtureScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -33,53 +34,37 @@ import static org.junit.Assert.assertThat;
 
 public final class AbstractSqlBuilderTest {
     
-    private AbstractSQLBuilder sqlBuilder;
-    
-    @Before
-    public void setUp() {
-        sqlBuilder = new AbstractSQLBuilder(Maps.newHashMap()) {
-            
-            @Override
-            protected String getLeftIdentifierQuoteString() {
-                return "`";
-            }
-    
-            @Override
-            protected String getRightIdentifierQuoteString() {
-                return "`";
-            }
-        };
-    }
+    private final ScalingSQLBuilder scalingSqlBuilder = new 
FixtureScalingSQLBuilder(Maps.newHashMap());
     
     @Test
     public void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+        String actual = scalingSqlBuilder.buildInsertSQL(mockDataRecord("t1"));
         assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) 
VALUES(?,?,?,?,?)"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithPrimaryKey() {
-        String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"), 
RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
+        String actual = scalingSqlBuilder.buildUpdateSQL(mockDataRecord("t2"), 
RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
         assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? 
WHERE `id` = ?"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithShardingColumns() {
         DataRecord dataRecord = mockDataRecord("t2");
-        String actual = sqlBuilder.buildUpdateSQL(dataRecord, 
mockConditionColumns(dataRecord));
+        String actual = scalingSqlBuilder.buildUpdateSQL(dataRecord, 
mockConditionColumns(dataRecord));
         assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? 
WHERE `id` = ? and `sc` = ?"));
     }
     
     @Test
     public void assertBuildDeleteSQLWithPrimaryKey() {
-        String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"), 
RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
+        String actual = scalingSqlBuilder.buildDeleteSQL(mockDataRecord("t3"), 
RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
         assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
     }
     
     @Test
     public void assertBuildDeleteSQLWithConditionColumns() {
         DataRecord dataRecord = mockDataRecord("t3");
-        String actual = sqlBuilder.buildDeleteSQL(dataRecord, 
mockConditionColumns(dataRecord));
+        String actual = scalingSqlBuilder.buildDeleteSQL(dataRecord, 
mockConditionColumns(dataRecord));
         assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
index 243b5d3..b35181d 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.core.fixture;
 
 import com.google.common.collect.Maps;
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
 import 
org.apache.shardingsphere.scaling.core.job.check.AbstractDataConsistencyChecker;
 import 
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
@@ -44,17 +44,7 @@ public final class FixtureDataConsistencyChecker extends 
AbstractDataConsistency
     }
     
     @Override
-    protected AbstractSQLBuilder getSqlBuilder() {
-        return new AbstractSQLBuilder(Maps.newHashMap()) {
-            @Override
-            protected String getLeftIdentifierQuoteString() {
-                return "`";
-            }
-            
-            @Override
-            protected String getRightIdentifierQuoteString() {
-                return "`";
-            }
-        };
+    protected ScalingSQLBuilder getSqlBuilder() {
+        return new FixtureScalingSQLBuilder(Maps.newHashMap());
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 3c47c07..f834cbc 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.fixture;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -58,6 +59,11 @@ public final class FixtureH2ScalingEntry implements 
ScalingEntry {
     }
     
     @Override
+    public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+        return FixtureScalingSQLBuilder.class;
+    }
+    
+    @Override
     public String getDatabaseType() {
         return "H2";
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingSQLBuilder.java
similarity index 53%
copy from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
copy to 
shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingSQLBuilder.java
index d004c52..1ed5372 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingSQLBuilder.java
@@ -15,27 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.fixture;
 
-import lombok.Getter;
-import lombok.Setter;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 
-/**
- * Inventory dumper configuration.
- */
-@Getter
-@Setter
-public final class InventoryDumperConfiguration extends DumperConfiguration {
-    
-    private String tableName;
+import java.util.Map;
+import java.util.Set;
+
+public final class FixtureScalingSQLBuilder extends AbstractScalingSQLBuilder 
implements ScalingSQLBuilder {
     
-    private String primaryKey;
+    public FixtureScalingSQLBuilder(final Map<String, Set<String>> 
shardingColumnsMap) {
+        super(shardingColumnsMap);
+    }
     
-    private Integer spiltNum;
+    @Override
+    protected String getLeftIdentifierQuoteString() {
+        return "`";
+    }
     
-    public InventoryDumperConfiguration(final DumperConfiguration 
dumperConfig) {
-        setDataSourceName(dumperConfig.getDataSourceName());
-        setDataSourceConfig(dumperConfig.getDataSourceConfig());
-        setTableNameMap(dumperConfig.getTableNameMap());
+    @Override
+    protected String getRightIdentifierQuoteString() {
+        return "`";
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
index 9a61cb8..f3d75a6 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
@@ -19,10 +19,12 @@ package 
org.apache.shardingsphere.scaling.core.job.preparer.splitter;
 
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
-import 
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
-import 
org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import 
org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
+import 
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.junit.After;
 import org.junit.Before;
@@ -34,6 +36,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -48,6 +51,8 @@ public final class InventoryDataTaskSplitterTest {
     
     private static final String PASSWORD = "password";
     
+    private static final String DATABASE_TYPE = "H2";
+    
     private SyncConfiguration syncConfig;
     
     private DataSourceManager dataSourceManager;
@@ -58,7 +63,7 @@ public final class InventoryDataTaskSplitterTest {
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfig();
         ImporterConfiguration importerConfig = new ImporterConfiguration();
-        syncConfig = new SyncConfiguration(3, dumperConfig, importerConfig);
+        syncConfig = new SyncConfiguration(new JobConfiguration(), 
dumperConfig, importerConfig);
         dataSourceManager = new DataSourceManager();
         inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
     }
@@ -70,16 +75,19 @@ public final class InventoryDataTaskSplitterTest {
     
     @Test
     public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
+        syncConfig.getJobConfig().setShardingSize(10);
         initIntPrimaryEnvironment(syncConfig.getDumperConfig());
-        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+        List<ScalingTask> actual = (List<ScalingTask>) 
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, 
dataSourceManager);
         assertNotNull(actual);
-        assertThat(actual.size(), is(3));
+        assertThat(actual.size(), is(10));
+        assertThat(((PrimaryKeyPosition) 
actual.get(9).getPositionManager().getPosition()).getBeginValue(), is(91L));
+        assertThat(((PrimaryKeyPosition) 
actual.get(9).getPositionManager().getPosition()).getEndValue(), is(100L));
     }
     
     @Test
     public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
         initCharPrimaryEnvironment(syncConfig.getDumperConfig());
-        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, 
dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -87,7 +95,7 @@ public final class InventoryDataTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithUnionPrimary() throws SQLException 
{
         initUnionPrimaryEnvironment(syncConfig.getDumperConfig());
-        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, 
dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -95,7 +103,7 @@ public final class InventoryDataTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
         initNoPrimaryEnvironment(syncConfig.getDumperConfig());
-        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+        Collection<ScalingTask> actual = 
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, 
dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -103,17 +111,19 @@ public final class InventoryDataTaskSplitterTest {
     private void initIntPrimaryEnvironment(final DumperConfiguration 
dumperConfig) throws SQLException {
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         try (Connection connection = dataSource.getConnection();
-            Statement statement = connection.createStatement()) {
+             Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
             statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, 
user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 
'xxx'), (999, 'yyy')");
+            for (int i = 1; i <= 100; i++) {
+                statement.execute(String.format("INSERT INTO t_order (id, 
user_id) VALUES (%d, 'x')", i));
+            }
         }
     }
     
     private void initCharPrimaryEnvironment(final DumperConfiguration 
dumperConfig) throws SQLException {
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         try (Connection connection = dataSource.getConnection();
-            Statement statement = connection.createStatement()) {
+             Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
             statement.execute("CREATE TABLE t_order (id CHAR(3) PRIMARY KEY, 
user_id VARCHAR(12))");
             statement.execute("INSERT INTO t_order (id, user_id) VALUES ('1', 
'xxx'), ('999', 'yyy')");
@@ -123,7 +133,7 @@ public final class InventoryDataTaskSplitterTest {
     private void initUnionPrimaryEnvironment(final DumperConfiguration 
dumperConfig) throws SQLException {
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         try (Connection connection = dataSource.getConnection();
-            Statement statement = connection.createStatement()) {
+             Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
             statement.execute("CREATE TABLE t_order (id INT, user_id 
VARCHAR(12), PRIMARY KEY (id, user_id))");
             statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 
'xxx'), (999, 'yyy')");
@@ -133,7 +143,7 @@ public final class InventoryDataTaskSplitterTest {
     private void initNoPrimaryEnvironment(final DumperConfiguration 
dumperConfig) throws SQLException {
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         try (Connection connection = dataSource.getConnection();
-            Statement statement = connection.createStatement()) {
+             Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
             statement.execute("CREATE TABLE t_order (id INT, user_id 
VARCHAR(12))");
             statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 
'xxx'), (999, 'yyy')");
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index 9d25a50..60a4c7d 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.scaling.core.job.task.inventory;
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import 
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import 
org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
@@ -58,7 +59,7 @@ public final class InventoryDataScalingTaskTest {
         DumperConfiguration dumperConfig = mockDumperConfig();
         ImporterConfiguration importerConfig = mockImporterConfig();
         ScalingContext.getInstance().init(new ServerConfiguration());
-        syncConfig = new SyncConfiguration(3, dumperConfig, importerConfig);
+        syncConfig = new SyncConfiguration(new JobConfiguration(), 
dumperConfig, importerConfig);
         dataSourceManager = new DataSourceManager();
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index bef398a..092b4df 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.mysql;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -30,6 +31,7 @@ import 
org.apache.shardingsphere.scaling.mysql.component.MySQLDataSourceChecker;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLImporter;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLJdbcDumper;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionManager;
+import 
org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
 
 /**
  * MySQL scaling entry.
@@ -67,6 +69,11 @@ public final class MySQLScalingEntry implements ScalingEntry 
{
     }
     
     @Override
+    public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+        return MySQLScalingSQLBuilder.class;
+    }
+    
+    @Override
     public String getDatabaseType() {
         return "MySQL";
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
index d05581e..bdaaf1a 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
@@ -97,7 +97,7 @@ public final class MySQLDataConsistencyChecker extends 
AbstractDataConsistencyCh
     }
     
     @Override
-    protected MySQLSQLBuilder getSqlBuilder() {
-        return new MySQLSQLBuilder(Maps.newHashMap());
+    protected MySQLScalingSQLBuilder getSqlBuilder() {
+        return new MySQLScalingSQLBuilder(Maps.newHashMap());
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
index 8de96a1..39f2155 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
@@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.utils.JDBCUtil;
 
 import java.util.Map;
@@ -38,7 +38,7 @@ public final class MySQLImporter extends AbstractJDBCImporter 
{
     }
     
     @Override
-    protected AbstractSQLBuilder createSQLBuilder(final Map<String, 
Set<String>> shardingColumnsMap) {
-        return new MySQLSQLBuilder(shardingColumnsMap);
+    protected ScalingSQLBuilder createSQLBuilder(final Map<String, 
Set<String>> shardingColumnsMap) {
+        return new MySQLScalingSQLBuilder(shardingColumnsMap);
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
similarity index 87%
rename from 
shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilder.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
index 608dc62..4c308b6 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilder.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.scaling.mysql.component;
 
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.utils.ShardingColumnsUtil;
 
 import java.util.Map;
@@ -28,9 +29,9 @@ import java.util.Set;
 /**
  * MySQL SQL builder.
  */
-public final class MySQLSQLBuilder extends AbstractSQLBuilder {
+public final class MySQLScalingSQLBuilder extends AbstractScalingSQLBuilder 
implements ScalingSQLBuilder {
     
-    public MySQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+    public MySQLScalingSQLBuilder(final Map<String, Set<String>> 
shardingColumnsMap) {
         super(shardingColumnsMap);
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilderTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilderTest.java
similarity index 92%
rename from 
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilderTest.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilderTest.java
index 94203bc..61ea317 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilderTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilderTest.java
@@ -29,9 +29,9 @@ import java.util.Set;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-public final class MySQLSQLBuilderTest {
+public final class MySQLScalingSQLBuilderTest {
     
-    private final MySQLSQLBuilder sqlBuilder = new 
MySQLSQLBuilder(ImmutableMap.<String, Set<String>>builder().put("t2", 
Sets.newHashSet("sc")).build());
+    private final MySQLScalingSQLBuilder sqlBuilder = new 
MySQLScalingSQLBuilder(ImmutableMap.<String, Set<String>>builder().put("t2", 
Sets.newHashSet("sc")).build());
     
     @Test
     public void assertBuildInsertSQL() {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index 42fa66f..6ddec40 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.postgresql;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import 
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -29,6 +30,7 @@ import 
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataSour
 import 
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLImporter;
 import 
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLJdbcDumper;
 import 
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionManager;
+import 
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLScalingSQLBuilder;
 import 
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLWalDumper;
 
 /**
@@ -67,6 +69,11 @@ public final class PostgreSQLScalingEntry implements 
ScalingEntry {
     }
     
     @Override
+    public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+        return PostgreSQLScalingSQLBuilder.class;
+    }
+    
+    @Override
     public String getDatabaseType() {
         return "PostgreSQL";
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
index 8e02c3d..2063120 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.postgresql.component;
 
 import com.google.common.collect.Maps;
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
 import 
org.apache.shardingsphere.scaling.core.job.check.AbstractDataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
@@ -41,7 +41,7 @@ public final class PostgreSQLDataConsistencyChecker extends 
AbstractDataConsiste
     }
     
     @Override
-    protected AbstractSQLBuilder getSqlBuilder() {
-        return new PostgreSQLSQLBuilder(Maps.newHashMap());
+    protected ScalingSQLBuilder getSqlBuilder() {
+        return new PostgreSQLScalingSQLBuilder(Maps.newHashMap());
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
index 2409029..bae6296 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.scaling.postgresql.component;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 
 import java.util.Map;
 import java.util.Set;
@@ -35,8 +35,8 @@ public final class PostgreSQLImporter extends 
AbstractJDBCImporter {
     }
     
     @Override
-    protected AbstractSQLBuilder createSQLBuilder(final Map<String, 
Set<String>> shardingColumnsMap) {
-        return new PostgreSQLSQLBuilder(shardingColumnsMap);
+    protected ScalingSQLBuilder createSQLBuilder(final Map<String, 
Set<String>> shardingColumnsMap) {
+        return new PostgreSQLScalingSQLBuilder(shardingColumnsMap);
     }
 }
 
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilder.java
similarity index 83%
rename from 
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSQLBuilder.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilder.java
index b5bb520..5316bb6 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSQLBuilder.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilder.java
@@ -20,7 +20,8 @@ package 
org.apache.shardingsphere.scaling.postgresql.component;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import 
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 
 import java.util.Map;
 import java.util.Set;
@@ -28,9 +29,9 @@ import java.util.Set;
 /**
  * PostgreSQL SQL builder.
  */
-public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
+public final class PostgreSQLScalingSQLBuilder extends 
AbstractScalingSQLBuilder implements ScalingSQLBuilder {
     
-    public PostgreSQLSQLBuilder(final Map<String, Set<String>> 
shardingColumnsMap) {
+    public PostgreSQLScalingSQLBuilder(final Map<String, Set<String>> 
shardingColumnsMap) {
         super(shardingColumnsMap);
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSqlBuilderTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
similarity index 91%
rename from 
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSqlBuilderTest.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
index 247a6b2..70bf3c2 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSqlBuilderTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
@@ -27,11 +27,11 @@ import org.postgresql.replication.LogSequenceNumber;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-public final class PostgreSQLSqlBuilderTest {
+public final class PostgreSQLScalingSQLBuilderTest {
     
     @Test
     public void assertBuildInsertSQL() {
-        String actual = new 
PostgreSQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
+        String actual = new 
PostgreSQLScalingSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
         assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") 
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
     }
     

Reply via email to