This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7604bfaa262 Rename CommonPipelineSQLBuilder (#27206)
7604bfaa262 is described below

commit 7604bfaa262728478babfce8633a247766084acd
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jul 15 08:26:17 2023 +0800

    Rename CommonPipelineSQLBuilder (#27206)
    
    * Remove PipelineSQLBuilderEngine
    
    * Add PipelineImportSQLBuilder
    
    * Refactor PipelineSQLBuilderEngine
    
    * Refactor PipelineSQLBuilderEngine
    
    * Rename CommonPipelineSQLBuilder
    
    * Remove useless H2PipelineSQLBuilder
    
    * Remove useless H2PipelineSQLBuilder
---
 ...erEngine.java => CommonPipelineSQLBuilder.java} | 25 +++++--------
 ...RC32MatchDataConsistencyCalculateAlgorithm.java | 10 +++---
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  6 ++--
 .../data/pipeline/core/dumper/InventoryDumper.java |  6 ++--
 .../core/importer/sink/PipelineDataSourceSink.java | 12 +++----
 .../preparer/InventoryRecordsCountCalculator.java  | 10 +++---
 .../core/preparer/InventoryTaskSplitter.java       |  6 ++--
 .../datasource/AbstractDataSourcePreparer.java     |  6 ++--
 .../checker/AbstractDataSourceChecker.java         |  6 ++--
 ...Test.java => CommonPipelineSQLBuilderTest.java} | 12 +++----
 .../sqlbuilder/PipelineImportSQLBuilderTest.java   |  2 +-
 .../fixture/FixturePipelineSQLBuilder.java         |  2 +-
 .../sqlbuilder/fixture/H2PipelineSQLBuilder.java   | 41 ----------------------
 .../datasource/AbstractDataSourceCheckerTest.java  |  2 +-
 ...peline.spi.sqlbuilder.DialectPipelineSQLBuilder |  1 -
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  | 12 +++----
 .../migration/api/impl/MigrationJobAPI.java        |  6 ++--
 .../core/fixture/H2PipelineSQLBuilder.java         |  3 +-
 18 files changed, 57 insertions(+), 111 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilder.java
similarity index 85%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilder.java
index 3031f3ed8ad..f81575f1319 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 
-import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
@@ -27,23 +26,15 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * Pipeline SQL builder engine.
+ * Common pipeline SQL builder.
  */
-public final class PipelineSQLBuilderEngine {
-    
-    @Getter
-    private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
-    
-    @Getter
-    private final PipelineImportSQLBuilder importSQLBuilder;
+public final class CommonPipelineSQLBuilder {
     
     private final DialectPipelineSQLBuilder dialectSQLBuilder;
     
     private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
     
-    public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
-        inventoryDumpSQLBuilder = new 
PipelineInventoryDumpSQLBuilder(databaseType);
-        importSQLBuilder = new PipelineImportSQLBuilder(databaseType);
+    public CommonPipelineSQLBuilder(final DatabaseType databaseType) {
         dialectSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
         sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
     }
@@ -100,8 +91,8 @@ public final class PipelineSQLBuilderEngine {
      * @return min max unique key SQL
      */
     public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final 
String tableName, final String uniqueKey) {
-        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
-        return String.format("SELECT MIN(%s), MAX(%s) FROM %s", 
quotedUniqueKey, quotedUniqueKey, 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
+        String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+        return String.format("SELECT MIN(%s), MAX(%s) FROM %s", 
escapedUniqueKey, escapedUniqueKey, 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
     }
     
     /**
@@ -116,10 +107,10 @@ public final class PipelineSQLBuilderEngine {
      */
     public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final List<String> columnNames, final String uniqueKey, final 
boolean firstQuery) {
         String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
-        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+        String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return firstQuery
-                ? String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
-                : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
+                ? String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey)
+                : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, 
escapedUniqueKey);
     }
     
     private String buildQueryColumns(final List<String> columnNames) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 58b52de71f2..3e578eb487d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -53,13 +53,13 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
     @Override
     public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter param) {
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(databaseType);
-        List<CalculatedItem> calculatedItems = 
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilderEngine, 
param, each)).collect(Collectors.toList());
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(databaseType);
+        List<CalculatedItem> calculatedItems = 
param.getColumnNames().stream().map(each -> calculateCRC32(pipelineSQLBuilder, 
param, each)).collect(Collectors.toList());
         return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilderEngine 
sqlBuilderEngine, final DataConsistencyCalculateParameter param, final String 
columnName) {
-        Optional<String> sql = 
sqlBuilderEngine.buildCRC32SQL(param.getSchemaName(), 
param.getLogicTableName(), columnName);
+    private CalculatedItem calculateCRC32(final CommonPipelineSQLBuilder 
pipelineSQLBuilder, final DataConsistencyCalculateParameter param, final String 
columnName) {
+        Optional<String> sql = 
pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(), 
param.getLogicTableName(), columnName);
         ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(param.getDatabaseType()));
         try (
                 Connection connection = param.getDataSource().getConnection();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 929b2d8d494..ff314ef957d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
@@ -164,9 +164,9 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             throw new UnsupportedOperationException("Data consistency of 
DATA_MATCH type not support table without unique key and primary key now");
         }
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(databaseType);
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(databaseType);
         boolean firstQuery = null == param.getTableCheckPosition();
-        return 
sqlBuilderEngine.buildQueryAllOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(), param.getColumnNames(), 
param.getUniqueKey().getName(), firstQuery);
+        return 
pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(), param.getColumnNames(), 
param.getUniqueKey().getName(), firstQuery);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 124e9be7b9c..fd0d2a35c25 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -41,7 +41,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.Placeholde
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineInventoryDumpSQLBuilder;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
@@ -77,7 +76,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     
     private final DataSource dataSource;
     
-    private final PipelineSQLBuilderEngine sqlBuilderEngine;
+    private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
     
     private final ColumnValueReaderEngine columnValueReaderEngine;
     
@@ -90,7 +89,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         this.channel = channel;
         this.dataSource = dataSource;
         DatabaseType databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType();
-        sqlBuilderEngine = new PipelineSQLBuilderEngine(databaseType);
+        inventoryDumpSQLBuilder = new 
PipelineInventoryDumpSQLBuilder(databaseType);
         columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
         this.metaDataLoader = metaDataLoader;
     }
@@ -156,7 +155,6 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         if (!Strings.isNullOrEmpty(dumperConfig.getQuerySQL())) {
             return dumperConfig.getQuerySQL();
         }
-        PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder = 
sqlBuilderEngine.getInventoryDumpSQLBuilder();
         LogicTableName logicTableName = new 
LogicTableName(dumperConfig.getLogicTableName());
         String schemaName = dumperConfig.getSchemaName(logicTableName);
         if (!dumperConfig.hasUniqueKey()) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 9b05f29f83b..d4d8774c790 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineImportSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
@@ -66,7 +66,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    private final PipelineSQLBuilderEngine sqlBuilderEngine;
+    private final PipelineImportSQLBuilder importSQLBuilder;
     
     private final AtomicReference<Statement> batchInsertStatement = new 
AtomicReference<>();
     
@@ -78,7 +78,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
         rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
-        sqlBuilderEngine = new 
PipelineSQLBuilderEngine(importerConfig.getDataSourceConfig().getDatabaseType());
+        importSQLBuilder = new 
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
     }
     
     @Override
@@ -201,7 +201,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private void executeBatchInsert(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String insertSql = 
sqlBuilderEngine.getImportSQLBuilder().buildInsertSQL(getSchemaName(dataRecord.getTableName()),
 dataRecord);
+        String insertSql = 
importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(insertSql)) {
             batchInsertStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
@@ -231,7 +231,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         Set<String> shardingColumns = 
importerConfig.getShardingColumns(dataRecord.getTableName());
         List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
         List<Column> setColumns = 
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
-        String updateSql = 
sqlBuilderEngine.getImportSQLBuilder().buildUpdateSQL(getSchemaName(dataRecord.getTableName()),
 dataRecord, conditionColumns);
+        String updateSql = 
importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord, conditionColumns);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(updateSql)) {
             updateStatement.set(preparedStatement);
             for (int i = 0; i < setColumns.size(); i++) {
@@ -258,7 +258,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private void executeBatchDelete(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String deleteSQL = 
sqlBuilderEngine.getImportSQLBuilder().buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
 dataRecord,
+        String deleteSQL = 
importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord,
                 RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(deleteSQL)) {
             batchDeleteStatement.set(preparedStatement);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index 5b77b34d244..597e9c3d173 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -54,15 +54,15 @@ public final class InventoryRecordsCountCalculator {
     public static long getTableRecordsCount(final InventoryDumperConfiguration 
dumperConfig, final PipelineDataSourceWrapper dataSource) {
         String schemaName = dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(dataSource.getDatabaseType());
-        Optional<String> sql = 
sqlBuilderEngine.buildEstimatedCountSQL(schemaName, actualTableName);
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(dataSource.getDatabaseType());
+        Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
         try {
             if (sql.isPresent()) {
                 DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, 
dataSource.getDatabaseType().getType());
                 long result = getEstimatedCount(databaseType, dataSource, 
sql.get());
-                return result > 0 ? result : getCount(dataSource, 
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
+                return result > 0 ? result : getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
             }
-            return getCount(dataSource, 
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
+            return getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
         } catch (final SQLException ex) {
             String uniqueKey = dumperConfig.hasUniqueKey() ? 
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
             throw new 
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), 
uniqueKey, ex);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 9461e8bfd80..5f5283c8054 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -37,7 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.St
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.common.util.IntervalToRangeIterator;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
@@ -201,8 +201,8 @@ public final class InventoryTaskSplitter {
     
     private Range<Long> getUniqueKeyValuesRange(final 
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, 
final InventoryDumperConfiguration dumperConfig) {
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(jobItemContext.getJobConfig().getSourceDatabaseType());
-        String sql = 
sqlBuilderEngine.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), uniqueKey);
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
+        String sql = 
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
index fea85711039..e2aacaf564b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfigur
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.generator.PipelineDDLGenerator;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 
@@ -55,14 +55,14 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         }
         CreateTableConfiguration createTableConfig = 
param.getCreateTableConfig();
         String defaultSchema = 
targetDatabaseType.getDefaultSchema().orElse(null);
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(targetDatabaseType);
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(targetDatabaseType);
         Collection<String> createdSchemaNames = new HashSet<>();
         for (CreateTableEntry each : 
createTableConfig.getCreateTableEntries()) {
             String targetSchemaName = 
each.getTargetName().getSchemaName().getOriginal();
             if (null == targetSchemaName || 
targetSchemaName.equalsIgnoreCase(defaultSchema) || 
createdSchemaNames.contains(targetSchemaName)) {
                 continue;
             }
-            Optional<String> sql = 
sqlBuilderEngine.buildCreateSchemaSQL(targetSchemaName);
+            Optional<String> sql = 
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
             if (sql.isPresent()) {
                 executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get());
                 createdSchemaNames.add(targetSchemaName);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
index f4917bec772..26f258e296c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
@@ -67,8 +67,8 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
     
     private boolean checkEmpty(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, getDatabaseType());
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(databaseType);
-        String sql = sqlBuilderEngine.buildCheckEmptySQL(schemaName, 
tableName);
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(databaseType);
+        String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName, 
tableName);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilderTest.java
similarity index 70%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilderTest.java
index 4e0ea46efba..0e0903e2c77 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilderTest.java
@@ -27,23 +27,23 @@ import java.util.Collections;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-class PipelineSQLBuilderEngineTest {
+class CommonPipelineSQLBuilderTest {
     
-    private final PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(TypedSPILoader.getService(DatabaseType.class, "H2"));
+    private final CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE"));
     
     @Test
     void assertBuildQueryAllOrderingSQLFirstQuery() {
-        String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", true);
+        String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", true);
         assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
-        actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
+        actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
ORDER BY order_id ASC"));
     }
     
     @Test
     void assertBuildQueryAllOrderingSQLNonFirstQuery() {
-        String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", false);
+        String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", false);
         assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY 
order_id ASC"));
-        actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
+        actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>? ORDER BY order_id ASC"));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
index 4691b91b70d..6a1cc7808fc 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
@@ -34,7 +34,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 class PipelineImportSQLBuilderTest {
     
-    private final PipelineImportSQLBuilder importSQLBuilder = new 
PipelineImportSQLBuilder(TypedSPILoader.getService(DatabaseType.class, "H2"));
+    private final PipelineImportSQLBuilder importSQLBuilder = new 
PipelineImportSQLBuilder(TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE"));
     
     @Test
     void assertBuildInsertSQL() {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
index 1883a186bc3..2f44ec6f809 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -25,7 +25,7 @@ public final class FixturePipelineSQLBuilder implements 
DialectPipelineSQLBuilde
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return null;
+        return String.format("SELECT * FROM %s LIMIT 1", tableName);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/H2PipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/H2PipelineSQLBuilder.java
deleted file mode 100644
index 06219e9d8ab..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/H2PipelineSQLBuilder.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.sqlbuilder.fixture;
-
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLSegmentBuilder;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-
-import java.util.Optional;
-
-public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
-    
-    @Override
-    public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, 
tableName));
-    }
-    
-    @Override
-    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
-        return Optional.empty();
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "H2";
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
index 25209cd1af4..0a912bb59a4 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
@@ -73,7 +73,7 @@ class AbstractDataSourceCheckerTest {
             
             @Override
             public String getDatabaseType() {
-                return "H2";
+                return "FIXTURE";
             }
         };
         dataSources = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
 
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
index 21e298f9649..896696eea95 100644
--- 
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
+++ 
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
@@ -16,4 +16,3 @@
 #
 
 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.fixture.FixturePipelineSQLBuilder
-org.apache.shardingsphere.data.pipeline.common.sqlbuilder.fixture.H2PipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 8f66b06ed90..74069a1af95 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -53,6 +53,12 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
         return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, 
tableName));
     }
     
+    @Override
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        return Optional.of(String.format("SELECT TABLE_ROWS FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
+                new 
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, 
tableName)));
+    }
+    
     @Override
     public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
         PipelineSQLSegmentBuilder sqlSegmentBuilder = new 
PipelineSQLSegmentBuilder(getType());
@@ -60,12 +66,6 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
                 sqlSegmentBuilder.getEscapedIdentifier(column), 
sqlSegmentBuilder.getEscapedIdentifier(tableName)));
     }
     
-    @Override
-    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
-        return Optional.of(String.format("SELECT TABLE_ROWS FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
-                new 
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, 
tableName)));
-    }
-    
     @Override
     public String getDatabaseType() {
         return "MySQL";
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 63c85329450..f1bb2dc9599 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -52,7 +52,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
@@ -395,14 +395,14 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
-        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(jobConfig.getTargetDatabaseType());
+        CommonPipelineSQLBuilder pipelineSQLBuilder = new 
CommonPipelineSQLBuilder(jobConfig.getTargetDatabaseType());
         TableNameSchemaNameMapping mapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
             for (String each : jobConfig.getTargetTableNames()) {
                 String targetSchemaName = mapping.getSchemaName(each);
-                String sql = sqlBuilderEngine.buildDropSQL(targetSchemaName, 
each);
+                String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, 
each);
                 log.info("cleanTempTableOnRollback, targetSchemaName={}, 
targetTableName={}, sql={}", targetSchemaName, each, sql);
                 try (Statement statement = connection.createStatement()) {
                     statement.execute(sql);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
index eb048eff7f6..c92ca17c988 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLSegmentBuilder;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
 import java.util.Optional;
@@ -26,7 +25,7 @@ public final class H2PipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, 
tableName));
+        return String.format("SELECT * FROM %s LIMIT 1", tableName);
     }
     
     @Override


Reply via email to