This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b85968da94 Add PipelineSQLBuilderEngine (#27090)
6b85968da94 is described below

commit 6b85968da947762c7f8d425055a42a738ed49af7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 15:02:46 2023 +0800

    Add PipelineSQLBuilderEngine (#27090)
    
    * Refactor AbstractPipelineSQLBuilder
    
    * Refactor FixturePipelineSQLBuilder
    
    * Refactor AbstractPipelineSQLBuilder
    
    * Add PipelineSQLBuilderEngine
    
    * Add PipelineSQLBuilderEngine
---
 .../spi/sqlbuilder/DialectPipelineSQLBuilder.java  |  89 ++++++++++
 .../spi/sqlbuilder/PipelineSQLBuilder.java         | 192 ---------------------
 ...LBuilder.java => PipelineSQLBuilderEngine.java} | 190 ++++++++++++++++----
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  11 +-
 ...DataMatchDataConsistencyCalculateAlgorithm.java |   7 +-
 .../data/pipeline/core/dumper/InventoryDumper.java |  15 +-
 .../core/importer/sink/PipelineDataSourceSink.java |  19 +-
 .../preparer/InventoryRecordsCountCalculator.java  |  11 +-
 .../core/preparer/InventoryTaskSplitter.java       |   7 +-
 .../datasource/AbstractDataSourcePreparer.java     |   7 +-
 .../checker/AbstractDataSourceChecker.java         |   6 +-
 .../sqlbuilder/FixturePipelineSQLBuilder.java      |  64 +------
 .../common/sqlbuilder/H2PipelineSQLBuilder.java    |  20 ++-
 ...Test.java => PipelineSQLBuilderEngineTest.java} |  41 ++---
 ...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} |   0
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |  39 ++---
 ...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} |   0
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |  21 +--
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  43 ++---
 ...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} |   0
 .../OpenGaussPipelineSQLBuilderTest.java           |  26 +--
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |  39 +++--
 ...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} |   0
 .../PostgreSQLPipelineSQLBuilderTest.java          |  27 +--
 .../migration/api/impl/MigrationJobAPI.java        |   9 +-
 .../core/fixture/H2PipelineSQLBuilder.java         |  20 +--
 ...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} |   0
 27 files changed, 402 insertions(+), 501 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
new file mode 100644
index 00000000000..793f35177db
--- /dev/null
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPI;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Dialect pipeline SQL builder.
+ */
+public interface DialectPipelineSQLBuilder extends DatabaseTypedSPI {
+    
+    /**
+     * Build create schema SQL.
+     *
+     * @param schemaName schema name
+     * @return create schema SQL
+     */
+    default Optional<String> buildCreateSchemaSQL(String schemaName) {
+        return Optional.empty();
+    }
+    
+    /**
+     * Build insert SQL on duplicate part.
+     *
+     * @param schemaName schema name
+     * @param dataRecord data record
+     * @return insert SQL on duplicate part
+     */
+    default Optional<String> buildInsertSQLOnDuplicatePart(String schemaName, 
DataRecord dataRecord) {
+        return Optional.empty();
+    }
+    
+    /**
+     * Extract updated columns.
+     *
+     * @param dataRecord data record
+     * @return filtered columns
+     */
+    List<Column> extractUpdatedColumns(DataRecord dataRecord);
+    
+    /**
+     * Build estimated count SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return estimated count SQL
+     */
+    Optional<String> buildEstimatedCountSQL(String schemaName, String 
tableName);
+    
+    /**
+     * Build CRC32 SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table Name
+     * @param column column
+     * @return CRC32 SQL
+     */
+    default Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
+        return Optional.empty();
+    }
+    
+    /**
+     * Judge whether keyword.
+     * 
+     * @param item item to be judged
+     * @return is keyword or not
+     */
+    boolean isKeyword(String item);
+}
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
deleted file mode 100644
index 646f1dc907a..00000000000
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPI;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Pipeline SQL builder.
- */
-public interface PipelineSQLBuilder extends DatabaseTypedSPI {
-    
-    /**
-     * Build create schema SQL.
-     *
-     * @param schemaName schema name
-     * @return create schema SQL
-     */
-    default Optional<String> buildCreateSchemaSQL(String schemaName) {
-        return Optional.empty();
-    }
-    
-    /**
-     * Build divisible inventory dump SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param columnNames column names
-     * @param uniqueKey unique key
-     * @return divisible inventory dump SQL
-     */
-    String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, 
List<String> columnNames, String uniqueKey);
-    
-    /**
-     * Build divisible inventory dump SQL without end value.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param columnNames column names
-     * @param uniqueKey unique key
-     * @return divisible inventory dump SQL without end value
-     */
-    String buildDivisibleInventoryDumpSQLNoEnd(String schemaName, String 
tableName, List<String> columnNames, String uniqueKey);
-    
-    /**
-     * Build indivisible inventory dump first SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param columnNames column names
-     * @param uniqueKey unique key
-     * @return indivisible inventory dump SQL
-     */
-    String buildIndivisibleInventoryDumpSQL(String schemaName, String 
tableName, List<String> columnNames, String uniqueKey);
-    
-    /**
-     * Build no unique key inventory dump SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName tableName
-     * @return inventory dump all SQL
-     */
-    String buildNoUniqueKeyInventoryDumpSQL(String schemaName, String 
tableName);
-    
-    /**
-     * Build insert SQL.
-     *
-     * @param schemaName schema name
-     * @param dataRecord data record
-     * @return insert SQL
-     */
-    String buildInsertSQL(String schemaName, DataRecord dataRecord);
-    
-    /**
-     * Build update SQL.
-     *
-     * @param schemaName schema name
-     * @param dataRecord data record
-     * @param conditionColumns condition columns
-     * @return update SQL
-     */
-    String buildUpdateSQL(String schemaName, DataRecord dataRecord, 
Collection<Column> conditionColumns);
-    
-    /**
-     * Extract updated columns.
-     *
-     * @param dataRecord data record
-     * @return filtered columns
-     */
-    // TODO Consider remove extractUpdatedColumns. openGauss has special impl 
currently
-    List<Column> extractUpdatedColumns(DataRecord dataRecord);
-    
-    /**
-     * Build delete SQL.
-     *
-     * @param schemaName schema name
-     * @param dataRecord data record
-     * @param conditionColumns condition columns
-     * @return delete SQL
-     */
-    String buildDeleteSQL(String schemaName, DataRecord dataRecord, 
Collection<Column> conditionColumns);
-    
-    /**
-     * Build drop SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return drop SQL
-     */
-    String buildDropSQL(String schemaName, String tableName);
-    
-    /**
-     * Build count SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return count SQL
-     */
-    String buildCountSQL(String schemaName, String tableName);
-    
-    /**
-     * Build estimated count SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return estimated count SQL
-     */
-    Optional<String> buildEstimatedCountSQL(String schemaName, String 
tableName);
-    
-    /**
-     * Build unique key minimum maximum values SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param uniqueKey unique key
-     * @return min max unique key SQL
-     */
-    String buildUniqueKeyMinMaxValuesSQL(String schemaName, String tableName, 
String uniqueKey);
-    
-    /**
-     * Build query all ordering SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param columnNames column names
-     * @param uniqueKey unique key, it may be primary key, not null
-     * @param firstQuery first query
-     * @return query SQL
-     */
-    String buildQueryAllOrderingSQL(String schemaName, String tableName, 
List<String> columnNames, String uniqueKey, boolean firstQuery);
-    
-    /**
-     * Build check empty SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return check SQL
-     */
-    String buildCheckEmptySQL(String schemaName, String tableName);
-    
-    /**
-     * Build CRC32 SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table Name
-     * @param column column
-     * @return CRC32 SQL
-     */
-    default Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
-        return Optional.empty();
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/AbstractPipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
similarity index 62%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/AbstractPipelineSQLBuilder.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index 0c6d23c5262..48e2f3c699a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -20,20 +20,21 @@ package 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 import com.google.common.base.Strings;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 /**
- * Abstract pipeline SQL builder.
+ * Pipeline SQL builder engine.
  */
-public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder 
{
+public final class PipelineSQLBuilderEngine {
     
     private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
     
@@ -43,6 +44,15 @@ public abstract class AbstractPipelineSQLBuilder implements 
PipelineSQLBuilder {
     
     private final ConcurrentMap<String, String> sqlCacheMap = new 
ConcurrentHashMap<>();
     
+    private final DatabaseType databaseType;
+    
+    private final DialectPipelineSQLBuilder pipelineSQLBuilder;
+    
+    public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
+        this.databaseType = databaseType;
+        pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
+    }
+    
     /**
      * Add left and right identifier quote string.
      *
@@ -50,26 +60,28 @@ public abstract class AbstractPipelineSQLBuilder implements 
PipelineSQLBuilder {
      * @return add quote string
      */
     public String quote(final String item) {
-        return isKeyword(item) ? getLeftIdentifierQuoteString() + item + 
getRightIdentifierQuoteString() : item;
+        return pipelineSQLBuilder.isKeyword(item) ? 
databaseType.getQuoteCharacter().wrap(item) : item;
     }
     
-    protected abstract boolean isKeyword(String item);
-    
     /**
-     * Get left identifier quote string.
+     * Build create schema SQL.
      *
-     * @return string
+     * @param schemaName schema name
+     * @return create schema SQL
      */
-    protected abstract String getLeftIdentifierQuoteString();
+    public Optional<String> buildCreateSchemaSQL(final String schemaName) {
+        return pipelineSQLBuilder.buildCreateSchemaSQL(schemaName);
+    }
     
     /**
-     * Get right identifier quote string.
+     * Build divisible inventory dump SQL.
      *
-     * @return string
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param columnNames column names
+     * @param uniqueKey unique key
+     * @return divisible inventory dump SQL
      */
-    protected abstract String getRightIdentifierQuoteString();
-    
-    @Override
     public String buildDivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
         String quotedUniqueKey = quote(uniqueKey);
@@ -83,42 +95,78 @@ public abstract class AbstractPipelineSQLBuilder implements 
PipelineSQLBuilder {
         return 
columnNames.stream().map(this::quote).collect(Collectors.joining(","));
     }
     
-    @Override
-    public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
+    /**
+     * Build divisible inventory dump SQL without limited value.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param columnNames column names
+     * @param uniqueKey unique key
+     * @return divisible inventory dump SQL without end value
+     */
+    public String buildNoLimitedDivisibleInventoryDumpSQL(final String 
schemaName, final String tableName, final List<String> columnNames, final 
String uniqueKey) {
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
         String quotedUniqueKey = quote(uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
     }
     
-    @Override
+    /**
+     * Build indivisible inventory dump first SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param columnNames column names
+     * @param uniqueKey unique key
+     * @return indivisible inventory dump SQL
+     */
     public String buildIndivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
         String quotedUniqueKey = quote(uniqueKey);
         return String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
     }
     
-    @Override
+    /**
+     * Build no unique key inventory dump SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName tableName
+     * @return inventory dump all SQL
+     */
     public String buildNoUniqueKeyInventoryDumpSQL(final String schemaName, 
final String tableName) {
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
         return String.format("SELECT * FROM %s", qualifiedTableName);
     }
     
-    protected final String getQualifiedTableName(final String schemaName, 
final String tableName) {
+    /**
+     * Get qualified table name.
+     * 
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return qualified table name
+     */
+    public String getQualifiedTableName(final String schemaName, final String 
tableName) {
         StringBuilder result = new StringBuilder();
-        if (getType().isSchemaAvailable() && 
!Strings.isNullOrEmpty(schemaName)) {
+        if (databaseType.isSchemaAvailable() && 
!Strings.isNullOrEmpty(schemaName)) {
             result.append(quote(schemaName)).append('.');
         }
         result.append(quote(tableName));
         return result.toString();
     }
     
-    @Override
+    /**
+     * Build insert SQL.
+     *
+     * @param schemaName schema name
+     * @param dataRecord data record
+     * @return insert SQL
+     */
     public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
             sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(schemaName, 
dataRecord.getTableName(), dataRecord.getColumns()));
         }
-        return sqlCacheMap.get(sqlCacheKey);
+        String insertSQL = sqlCacheMap.get(sqlCacheKey);
+        return pipelineSQLBuilder.buildInsertSQLOnDuplicatePart(schemaName, 
dataRecord).map(optional -> insertSQL + " " + optional).orElse(insertSQL);
     }
     
     private String buildInsertSQLInternal(final String schemaName, final 
String tableName, final List<Column> columns) {
@@ -133,7 +181,14 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
         return String.format("INSERT INTO %s(%s) VALUES(%s)", 
getQualifiedTableName(schemaName, tableName), columnsLiteral, holder);
     }
     
-    @Override
+    /**
+     * Build update SQL.
+     *
+     * @param schemaName schema name
+     * @param dataRecord data record
+     * @param conditionColumns condition columns
+     * @return update SQL
+     */
     public String buildUpdateSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -151,12 +206,24 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
         return String.format("UPDATE %s SET %%s WHERE %s", 
getQualifiedTableName(schemaName, tableName), buildWhereSQL(conditionColumns));
     }
     
-    @Override
+    /**
+     * Extract updated columns.
+     *
+     * @param dataRecord data record
+     * @return filtered columns
+     */
     public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
-        return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
+        return pipelineSQLBuilder.extractUpdatedColumns(dataRecord);
     }
     
-    @Override
+    /**
+     * Build delete SQL.
+     *
+     * @param schemaName schema name
+     * @param dataRecord data record
+     * @param conditionColumns condition columns
+     * @return delete SQL
+     */
     public String buildDeleteSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -165,7 +232,13 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
         return sqlCacheMap.get(sqlCacheKey);
     }
     
-    @Override
+    /**
+     * Build drop SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return drop SQL
+     */
     public String buildDropSQL(final String schemaName, final String 
tableName) {
         return String.format("DROP TABLE IF EXISTS %s", 
getQualifiedTableName(schemaName, tableName));
     }
@@ -183,18 +256,51 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
         return where.toString();
     }
     
-    @Override
+    /**
+     * Build count SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return count SQL
+     */
     public String buildCountSQL(final String schemaName, final String 
tableName) {
         return String.format("SELECT COUNT(*) FROM %s", 
getQualifiedTableName(schemaName, tableName));
     }
     
-    @Override
+    /**
+     * Build estimated count SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return estimated count SQL
+     */
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        return pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, 
tableName);
+    }
+    
+    /**
+     * Build unique key minimum maximum values SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey unique key
+     * @return min max unique key SQL
+     */
     public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final 
String tableName, final String uniqueKey) {
         String quotedUniqueKey = quote(uniqueKey);
         return String.format("SELECT MIN(%s), MAX(%s) FROM %s", 
quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
     }
     
-    @Override
+    /**
+     * Build query all ordering SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param columnNames column names
+     * @param uniqueKey unique key, it may be primary key, not null
+     * @param firstQuery first query
+     * @return query SQL
+     */
     public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final List<String> columnNames, final String uniqueKey, final 
boolean firstQuery) {
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
         String quotedUniqueKey = quote(uniqueKey);
@@ -203,8 +309,26 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
                 : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
     }
     
-    @Override
+    /**
+     * Build check empty SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return check SQL
+     */
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
         return String.format("SELECT * FROM %s LIMIT 1", 
getQualifiedTableName(schemaName, tableName));
     }
+    
+    /**
+     * Build CRC32 SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table Name
+     * @param column column
+     * @return CRC32 SQL
+     */
+    public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
+        return pipelineSQLBuilder.buildCRC32SQL(schemaName, tableName, column);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index c4b098ae36a..58b52de71f2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -20,14 +20,13 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -54,13 +53,13 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
     @Override
     public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter param) {
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
-        PipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
-        List<CalculatedItem> calculatedItems = 
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, param, 
each)).collect(Collectors.toList());
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(databaseType);
+        List<CalculatedItem> calculatedItems = 
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilderEngine, 
param, each)).collect(Collectors.toList());
         return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter param, final String columnName) {
-        Optional<String> sql = sqlBuilder.buildCRC32SQL(param.getSchemaName(), 
param.getLogicTableName(), columnName);
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilderEngine 
sqlBuilderEngine, final DataConsistencyCalculateParameter param, final String 
columnName) {
+        Optional<String> sql = 
sqlBuilderEngine.buildCRC32SQL(param.getSchemaName(), 
param.getLogicTableName(), columnName);
         ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(param.getDatabaseType()));
         try (
                 Connection connection = param.getDataSource().getConnection();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index f80ae4e6907..929b2d8d494 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
@@ -27,10 +28,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Data
 import 
org.apache.shardingsphere.data.pipeline.core.dumper.ColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -165,9 +164,9 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             throw new UnsupportedOperationException("Data consistency of 
DATA_MATCH type not support table without unique key and primary key now");
         }
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
-        PipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(databaseType);
         boolean firstQuery = null == param.getTableCheckPosition();
-        return sqlBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(), param.getColumnNames(), 
param.getUniqueKey().getName(), firstQuery);
+        return 
sqlBuilderEngine.buildQueryAllOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(), param.getColumnNames(), 
param.getUniqueKey().getName(), firstQuery);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 86745e5e013..6717c6d8a44 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -40,15 +40,14 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPo
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPositionFactory;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -77,7 +76,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     
     private final DataSource dataSource;
     
-    private final PipelineSQLBuilder sqlBuilder;
+    private final PipelineSQLBuilderEngine sqlBuilderEngine;
     
     private final ColumnValueReaderEngine columnValueReaderEngine;
     
@@ -90,7 +89,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         this.channel = channel;
         this.dataSource = dataSource;
         DatabaseType databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType();
-        sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
+        sqlBuilderEngine = new PipelineSQLBuilderEngine(databaseType);
         columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
         this.metaDataLoader = metaDataLoader;
     }
@@ -159,20 +158,20 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         LogicTableName logicTableName = new 
LogicTableName(dumperConfig.getLogicTableName());
         String schemaName = dumperConfig.getSchemaName(logicTableName);
         if (!dumperConfig.hasUniqueKey()) {
-            return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName());
+            return 
sqlBuilderEngine.buildNoUniqueKeyInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName());
         }
         PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) 
dumperConfig.getPosition();
         PipelineColumnMetaData firstColumn = 
dumperConfig.getUniqueKeyColumns().get(0);
         List<String> columnNames = 
dumperConfig.getColumnNameList(logicTableName).orElse(Collections.singletonList("*"));
         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || 
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
             if (null != position.getBeginValue() && null != 
position.getEndValue()) {
-                return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
+                return 
sqlBuilderEngine.buildDivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
             }
             if (null != position.getBeginValue() && null == 
position.getEndValue()) {
-                return 
sqlBuilder.buildDivisibleInventoryDumpSQLNoEnd(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
+                return 
sqlBuilderEngine.buildNoLimitedDivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
             }
         }
-        return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
+        return sqlBuilderEngine.buildIndivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
     }
     
     private void setParameters(final PreparedStatement preparedStatement) 
throws SQLException {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index f568a7a8f06..46f920cbd04 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -32,12 +32,11 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -65,10 +64,10 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private final PipelineDataSourceManager dataSourceManager;
     
-    private final PipelineSQLBuilder pipelineSqlBuilder;
-    
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
+    private final PipelineSQLBuilderEngine sqlBuilderEngine;
+    
     private final AtomicReference<Statement> batchInsertStatement = new 
AtomicReference<>();
     
     private final AtomicReference<Statement> updateStatement = new 
AtomicReference<>();
@@ -77,9 +76,9 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     public PipelineDataSourceSink(final ImporterConfiguration importerConfig, 
final PipelineDataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
-        rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         this.dataSourceManager = dataSourceManager;
-        pipelineSqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
importerConfig.getDataSourceConfig().getDatabaseType());
+        rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
+        sqlBuilderEngine = new 
PipelineSQLBuilderEngine(importerConfig.getDataSourceConfig().getDatabaseType());
     }
     
     @Override
@@ -202,7 +201,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private void executeBatchInsert(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String insertSql = 
pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord);
+        String insertSql = 
sqlBuilderEngine.buildInsertSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(insertSql)) {
             batchInsertStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
@@ -231,8 +230,8 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     private void executeUpdate(final Connection connection, final DataRecord 
dataRecord) throws SQLException {
         Set<String> shardingColumns = 
importerConfig.getShardingColumns(dataRecord.getTableName());
         List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
-        List<Column> updatedColumns = 
pipelineSqlBuilder.extractUpdatedColumns(dataRecord);
-        String updateSql = 
pipelineSqlBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord, conditionColumns);
+        List<Column> updatedColumns = 
sqlBuilderEngine.extractUpdatedColumns(dataRecord);
+        String updateSql = 
sqlBuilderEngine.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord, conditionColumns);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(updateSql)) {
             updateStatement.set(preparedStatement);
             for (int i = 0; i < updatedColumns.size(); i++) {
@@ -259,7 +258,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private void executeBatchDelete(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String deleteSQL = 
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord,
+        String deleteSQL = 
sqlBuilderEngine.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord,
                 RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(deleteSQL)) {
             batchDeleteStatement.set(preparedStatement);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index a5c25e268ed..5b77b34d244 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -23,9 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -55,15 +54,15 @@ public final class InventoryRecordsCountCalculator {
     public static long getTableRecordsCount(final InventoryDumperConfiguration 
dumperConfig, final PipelineDataSourceWrapper dataSource) {
         String schemaName = dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
-        PipelineSQLBuilder pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
dataSource.getDatabaseType());
-        Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(dataSource.getDatabaseType());
+        Optional<String> sql = 
sqlBuilderEngine.buildEstimatedCountSQL(schemaName, actualTableName);
         try {
             if (sql.isPresent()) {
                 DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, 
dataSource.getDatabaseType().getType());
                 long result = getEstimatedCount(databaseType, dataSource, 
sql.get());
-                return result > 0 ? result : getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+                return result > 0 ? result : getCount(dataSource, 
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
             }
-            return getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+            return getCount(dataSource, 
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
         } catch (final SQLException ex) {
             String uniqueKey = dumperConfig.hasUniqueKey() ? 
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
             throw new 
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), 
uniqueKey, ex);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 5a0dae236ef..f773649c86b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -38,6 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.StringPrim
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.UnsupportedKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.util.IntervalToRangeIterator;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
@@ -47,8 +48,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsum
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -203,8 +202,8 @@ public final class InventoryTaskSplitter {
     
     private Range<Long> getUniqueKeyValuesRange(final 
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, 
final InventoryDumperConfiguration dumperConfig) {
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        String sql = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
jobItemContext.getJobConfig().getSourceDatabaseType())
-                .buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), uniqueKey);
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(jobItemContext.getJobConfig().getSourceDatabaseType());
+        String sql = 
sqlBuilderEngine.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
index 7c184645029..fea85711039 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
@@ -24,10 +24,9 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfigur
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.generator.PipelineDDLGenerator;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -56,14 +55,14 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         }
         CreateTableConfiguration createTableConfig = 
param.getCreateTableConfig();
         String defaultSchema = 
targetDatabaseType.getDefaultSchema().orElse(null);
-        PipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, targetDatabaseType);
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(targetDatabaseType);
         Collection<String> createdSchemaNames = new HashSet<>();
         for (CreateTableEntry each : 
createTableConfig.getCreateTableEntries()) {
             String targetSchemaName = 
each.getTargetName().getSchemaName().getOriginal();
             if (null == targetSchemaName || 
targetSchemaName.equalsIgnoreCase(defaultSchema) || 
createdSchemaNames.contains(targetSchemaName)) {
                 continue;
             }
-            Optional<String> sql = 
sqlBuilder.buildCreateSchemaSQL(targetSchemaName);
+            Optional<String> sql = 
sqlBuilderEngine.buildCreateSchemaSQL(targetSchemaName);
             if (sql.isPresent()) {
                 executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get());
                 createdSchemaNames.add(targetSchemaName);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
index 0dc919fa28a..f4917bec772 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
@@ -19,12 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
@@ -68,7 +67,8 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
     
     private boolean checkEmpty(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, getDatabaseType());
-        String sql = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
databaseType).buildCheckEmptySQL(schemaName, tableName);
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(databaseType);
+        String sql = sqlBuilderEngine.buildCheckEmptySQL(schemaName, 
tableName);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
index 58a5f586a30..1177aae74c5 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
@@ -19,38 +19,17 @@ package 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
+public final class FixturePipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
-    public String buildDivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
-        return "";
-    }
-    
-    @Override
-    public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
-        return "";
-    }
-    
-    @Override
-    public String buildIndivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
-        return "";
-    }
-    
-    @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
-        return "";
-    }
-    
-    @Override
-    public String buildUpdateSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
-        return "";
+    public boolean isKeyword(final String item) {
+        return false;
     }
     
     @Override
@@ -58,51 +37,16 @@ public final class FixturePipelineSQLBuilder implements 
PipelineSQLBuilder {
         return Collections.emptyList();
     }
     
-    @Override
-    public String buildDeleteSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
-        return "";
-    }
-    
-    @Override
-    public String buildDropSQL(final String schemaName, final String 
tableName) {
-        return "";
-    }
-    
-    @Override
-    public String buildCountSQL(final String schemaName, final String 
tableName) {
-        return "";
-    }
-    
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
         return Optional.empty();
     }
     
-    @Override
-    public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final 
String tableName, final String uniqueKey) {
-        return "";
-    }
-    
-    @Override
-    public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final List<String> columnNames, final String uniqueKey, final 
boolean firstQuery) {
-        return "";
-    }
-    
-    @Override
-    public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return null;
-    }
-    
     @Override
     public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
         return Optional.of(String.format("SELECT CRC32(%s) FROM %s", column, 
tableName));
     }
     
-    @Override
-    public String buildNoUniqueKeyInventoryDumpSQL(final String schemaName, 
final String tableName) {
-        return "";
-    }
-    
     @Override
     public String getDatabaseType() {
         return "FIXTURE";
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
index 7cb22c98bc4..dc4cf156a32 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
@@ -17,23 +17,25 @@
 
 package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 
-public final class H2PipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
     
     @Override
-    protected boolean isKeyword(final String item) {
+    public boolean isKeyword(final String item) {
         return false;
     }
     
     @Override
-    protected String getLeftIdentifierQuoteString() {
-        return "";
-    }
-    
-    @Override
-    protected String getRightIdentifierQuoteString() {
-        return "";
+    public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+        return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
similarity index 69%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
index 76f1b4b8ab0..bf5568e0da1 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
@@ -22,7 +22,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -32,79 +33,79 @@ import java.util.Collections;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-class PipelineSQLBuilderTest {
+class PipelineSQLBuilderEngineTest {
     
-    private final PipelineSQLBuilder pipelineSQLBuilder = new 
H2PipelineSQLBuilder();
+    private final PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(TypedSPILoader.getService(DatabaseType.class, "H2"));
     
     @Test
     void assertBuildDivisibleInventoryDumpSQL() {
-        String actual = 
pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, "t_order", 
Collections.singletonList("*"), "order_id");
+        String actual = sqlBuilderEngine.buildDivisibleInventoryDumpSQL(null, 
"t_order", Collections.singletonList("*"), "order_id");
         assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? AND 
order_id<=? ORDER BY order_id ASC"));
-        actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, 
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+        actual = sqlBuilderEngine.buildDivisibleInventoryDumpSQL(null, 
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC"));
     }
     
     @Test
     void assertBuildDivisibleInventoryDumpSQLNoEnd() {
-        String actual = 
pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, "t_order", 
Collections.singletonList("*"), "order_id");
+        String actual = 
sqlBuilderEngine.buildNoLimitedDivisibleInventoryDumpSQL(null, "t_order", 
Collections.singletonList("*"), "order_id");
         assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? ORDER 
BY order_id ASC"));
-        actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, 
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+        actual = 
sqlBuilderEngine.buildNoLimitedDivisibleInventoryDumpSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id");
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>=? ORDER BY order_id ASC"));
     }
     
     @Test
     void assertBuildIndivisibleInventoryDumpSQL() {
-        String actual = 
pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, "t_order", 
Collections.singletonList("*"), "order_id");
+        String actual = 
sqlBuilderEngine.buildIndivisibleInventoryDumpSQL(null, "t_order", 
Collections.singletonList("*"), "order_id");
         assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
-        actual = pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, 
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+        actual = sqlBuilderEngine.buildIndivisibleInventoryDumpSQL(null, 
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
ORDER BY order_id ASC"));
     }
     
     @Test
     void assertBuildQueryAllOrderingSQLFirstQuery() {
-        String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", true);
+        String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", true);
         assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
-        actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
+        actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
ORDER BY order_id ASC"));
     }
     
     @Test
     void assertBuildQueryAllOrderingSQLNonFirstQuery() {
-        String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", false);
+        String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, 
"t_order", Collections.singletonList("*"), "order_id", false);
         assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY 
order_id ASC"));
-        actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
+        actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>? ORDER BY order_id ASC"));
     }
     
     @Test
     void assertBuildInsertSQL() {
-        String actual = pipelineSQLBuilder.buildInsertSQL(null, 
mockDataRecord("t2"));
+        String actual = sqlBuilderEngine.buildInsertSQL(null, 
mockDataRecord("t2"));
         assertThat(actual, is("INSERT INTO t2(id,sc,c1,c2,c3) 
VALUES(?,?,?,?,?)"));
     }
     
     @Test
     void assertBuildUpdateSQLWithPrimaryKey() {
-        String actual = pipelineSQLBuilder.buildUpdateSQL(null, 
mockDataRecord("t2"), RecordUtils.extractPrimaryColumns(mockDataRecord("t2")));
+        String actual = sqlBuilderEngine.buildUpdateSQL(null, 
mockDataRecord("t2"), RecordUtils.extractPrimaryColumns(mockDataRecord("t2")));
         assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = 
?"));
     }
     
     @Test
     void assertBuildUpdateSQLWithShardingColumns() {
         DataRecord dataRecord = mockDataRecord("t2");
-        String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
+        String actual = sqlBuilderEngine.buildUpdateSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
         assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ? 
AND sc = ?"));
     }
     
     @Test
     void assertBuildDeleteSQLWithPrimaryKey() {
-        String actual = pipelineSQLBuilder.buildDeleteSQL(null, 
mockDataRecord("t3"), RecordUtils.extractPrimaryColumns(mockDataRecord("t3")));
+        String actual = sqlBuilderEngine.buildDeleteSQL(null, 
mockDataRecord("t3"), RecordUtils.extractPrimaryColumns(mockDataRecord("t3")));
         assertThat(actual, is("DELETE FROM t3 WHERE id = ?"));
     }
     
     @Test
     void assertBuildDeleteSQLWithConditionColumns() {
         DataRecord dataRecord = mockDataRecord("t3");
-        String actual = pipelineSQLBuilder.buildDeleteSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
+        String actual = sqlBuilderEngine.buildDeleteSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
         assertThat(actual, is("DELETE FROM t3 WHERE id = ? AND sc = ?"));
     }
     
@@ -124,7 +125,7 @@ class PipelineSQLBuilderTest {
     
     @Test
     void assertBuildDeleteSQLWithoutUniqueKey() {
-        String actual = pipelineSQLBuilder.buildDeleteSQL(null, 
mockDataRecordWithoutUniqueKey("t_order"),
+        String actual = sqlBuilderEngine.buildDeleteSQL(null, 
mockDataRecordWithoutUniqueKey("t_order"),
                 
RecordUtils.extractConditionColumns(mockDataRecordWithoutUniqueKey("t_order"), 
Collections.emptySet()));
         assertThat(actual, is("DELETE FROM t_order WHERE id = ? AND name = 
?"));
     }
@@ -132,7 +133,7 @@ class PipelineSQLBuilderTest {
     @Test
     void assertBuildUpdateSQLWithoutShardingColumns() {
         DataRecord dataRecord = mockDataRecordWithoutUniqueKey("t_order");
-        String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
+        String actual = sqlBuilderEngine.buildUpdateSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
         assertThat(actual, is("UPDATE t_order SET name = ? WHERE id = ? AND 
name = ?"));
     }
     
diff --git 
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to 
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index c25ffd99ce6..f0df4901488 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -19,17 +19,21 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
 /**
  * MySQL pipeline SQL builder.
  */
-public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     private static final Set<String> RESERVED_KEYWORDS = new 
HashSet<>(Arrays.asList(
             "ADD", "ALL", "ALTER", "ANALYZE", "AND", "AS", "ASC", "BEFORE", 
"BETWEEN", "BIGINT", "BINARY", "BLOB", "BOTH", "BY", "CALL",
@@ -50,26 +54,12 @@ public final class MySQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
             "WHEN", "WHERE", "WHILE", "WINDOW", "WITH", "WRITE", "XOR", 
"YEAR_MONTH", "ZEROFILL"));
     
     @Override
-    protected boolean isKeyword(final String item) {
+    public boolean isKeyword(final String item) {
         return RESERVED_KEYWORDS.contains(item.toUpperCase());
     }
     
     @Override
-    public String getLeftIdentifierQuoteString() {
-        return "`";
-    }
-    
-    @Override
-    public String getRightIdentifierQuoteString() {
-        return "`";
-    }
-    
-    @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
-        return super.buildInsertSQL(schemaName, dataRecord) + 
buildDuplicateUpdateSQL(dataRecord);
-    }
-    
-    private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
+    public Optional<String> buildInsertSQLOnDuplicatePart(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
@@ -83,7 +73,12 @@ public final class MySQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
             
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
         }
         result.setLength(result.length() - 1);
-        return result.toString();
+        return Optional.of(result.toString());
+    }
+    
+    @Override
+    public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+        return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
     }
     
     @Override
@@ -94,7 +89,11 @@ public final class MySQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
         return Optional.of(String.format("SELECT TABLE_ROWS FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
-                getQualifiedTableName(schemaName, tableName)));
+                new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
+    }
+    
+    private String quote(final String item) {
+        return isKeyword(item) ? getType().getQuoteCharacter().wrap(item) : 
item;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index ec234b30f1a..de50f74f720 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -34,15 +34,15 @@ class MySQLPipelineSQLBuilderTest {
     private final MySQLPipelineSQLBuilder sqlBuilder = new 
MySQLPipelineSQLBuilder();
     
     @Test
-    void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"));
-        assertThat(actual, is("INSERT INTO t1(id,sc,c1,c2,c3) 
VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
+    void assertBuildInsertSQLOnDuplicatePart() {
+        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null, 
mockDataRecord("t1")).orElse(null);
+        assertThat(actual, is(" ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
     @Test
-    void assertBuildInsertSQLHasShardingColumn() {
-        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t2"));
-        assertThat(actual, is("INSERT INTO t2(id,sc,c1,c2,c3) 
VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
+    void assertBuildInsertSQLOnDuplicatePartHasShardingColumn() {
+        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null, 
mockDataRecord("t2")).orElse(null);
+        assertThat(actual, is(" ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
     @Test
@@ -62,15 +62,6 @@ class MySQLPipelineSQLBuilderTest {
         return result;
     }
     
-    @Test
-    void assertQuoteKeyword() {
-        String tableName = "CASCADE";
-        String actualCountSql = sqlBuilder.buildCountSQL(null, tableName);
-        assertThat(actualCountSql, is(String.format("SELECT COUNT(*) FROM %s", 
sqlBuilder.quote(tableName))));
-        actualCountSql = sqlBuilder.buildCountSQL(null, 
tableName.toLowerCase());
-        assertThat(actualCountSql, is(String.format("SELECT COUNT(*) FROM %s", 
sqlBuilder.quote(tableName.toLowerCase()))));
-    }
-    
     @Test
     void assertBuilderEstimateCountSQLWithoutKeyword() {
         Optional<String> actual = sqlBuilder.buildEstimatedCountSQL(null, 
"t_order");
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 4e0fc38ab09..ddf0819d33a 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,7 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -31,7 +32,7 @@ import java.util.stream.Collectors;
 /**
  * Pipeline SQL builder of openGauss.
  */
-public final class OpenGaussPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
+public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     private static final Set<String> RESERVED_KEYWORDS = new 
HashSet<>(Arrays.asList(
             "ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", 
"ASYMMETRIC", "AUTHID", "AUTHORIZATION", "BETWEEN", "BIGINT",
@@ -48,36 +49,17 @@ public final class OpenGaussPipelineSQLBuilder extends 
AbstractPipelineSQLBuilde
             "XMLFOREST", "XMLPARSE", "XMLPI", "XMLROOT", "XMLSERIALIZE"));
     
     @Override
-    protected boolean isKeyword(final String item) {
+    public boolean isKeyword(final String item) {
         return RESERVED_KEYWORDS.contains(item.toUpperCase());
     }
     
-    @Override
-    protected String getLeftIdentifierQuoteString() {
-        return "\"";
-    }
-    
-    @Override
-    protected String getRightIdentifierQuoteString() {
-        return "\"";
-    }
-    
     @Override
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
         return Optional.of(String.format("CREATE SCHEMA %s", 
quote(schemaName)));
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
-        return super.buildInsertSQL(schemaName, dataRecord) + 
buildConflictSQL(dataRecord);
-    }
-    
-    @Override
-    public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
-        return dataRecord.getColumns().stream().filter(each -> 
!(each.isUniqueKey())).collect(Collectors.toList());
-    }
-    
-    private String buildConflictSQL(final DataRecord dataRecord) {
+    public Optional<String> buildInsertSQLOnDuplicatePart(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
@@ -87,13 +69,22 @@ public final class OpenGaussPipelineSQLBuilder extends 
AbstractPipelineSQLBuilde
             
result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(',');
         }
         result.setLength(result.length() - 1);
-        return result.toString();
+        return Optional.of(result.toString());
+    }
+    
+    @Override
+    public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+        return dataRecord.getColumns().stream().filter(each -> 
!(each.isUniqueKey())).collect(Collectors.toList());
     }
     
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
+        return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;",
+                new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
+    }
+    
+    private String quote(final String item) {
+        return isKeyword(item) ? getType().getQuoteCharacter().wrap(item) : 
item;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index eae1df8aa3a..794bb029b4c 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -23,20 +23,17 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeTyp
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import org.junit.jupiter.api.Test;
 
-import java.util.Optional;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class OpenGaussPipelineSQLBuilderTest {
     
     private final OpenGaussPipelineSQLBuilder sqlBuilder = new 
OpenGaussPipelineSQLBuilder();
     
     @Test
-    void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"));
-        assertThat(actual, is("INSERT INTO t1(id,c0,c1,c2,c3) 
VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE 
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
+    void assertBuildInsertSQLOnDuplicatePart() {
+        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null, 
mockDataRecord("t1")).orElse(null);
+        assertThat(actual, is(" ON DUPLICATE KEY UPDATE 
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
     }
     
     private DataRecord mockDataRecord(final String tableName) {
@@ -48,21 +45,4 @@ class OpenGaussPipelineSQLBuilderTest {
         result.addColumn(new Column("c3", "", true, false));
         return result;
     }
-    
-    @Test
-    void assertQuoteKeyword() {
-        String schemaName = "RECYCLEBIN";
-        Optional<String> actualCreateSchemaSql = 
sqlBuilder.buildCreateSchemaSQL(schemaName);
-        assertTrue(actualCreateSchemaSql.isPresent());
-        assertThat(actualCreateSchemaSql.get(), is(String.format("CREATE 
SCHEMA %s", sqlBuilder.quote(schemaName))));
-        String actualDropSQL = sqlBuilder.buildDropSQL(schemaName, "ALL");
-        String expectedDropSQL = String.format("DROP TABLE IF EXISTS %s", 
String.join(".", sqlBuilder.quote(schemaName), sqlBuilder.quote("ALL")));
-        assertThat(actualDropSQL, is(expectedDropSQL));
-    }
-    
-    @Test
-    void assertBuilderDropSQLWithoutKeyword() {
-        String actualDropSQL = sqlBuilder.buildDropSQL("test_normal", 
"t_order");
-        assertThat(actualDropSQL, is("DROP TABLE IF EXISTS 
test_normal.t_order"));
-    }
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 6a24ed29d86..3b3d05c90ad 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -20,17 +20,20 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
 /**
  * PostgreSQL pipeline SQL builder.
  */
-public final class PostgreSQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
+public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     private static final Set<String> RESERVED_KEYWORDS = new 
HashSet<>(Arrays.asList(
             "ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", 
"ASYMMETRIC", "AUTHORIZATION", "BETWEEN", "BIGINT", "BINARY",
@@ -44,33 +47,22 @@ public final class PostgreSQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuild
             "WINDOW", "WITH", "XMLATTRIBUTES", "XMLCONCAT", "XMLELEMENT", 
"XMLEXISTS", "XMLFOREST", "XMLNAMESPACES", "XMLPARSE", "XMLPI", "XMLROOT", 
"XMLSERIALIZE", "XMLTABLE"));
     
     @Override
-    protected boolean isKeyword(final String item) {
+    public boolean isKeyword(final String item) {
         return RESERVED_KEYWORDS.contains(item.toUpperCase());
     }
     
-    @Override
-    protected String getLeftIdentifierQuoteString() {
-        return "\"";
-    }
-    
-    @Override
-    protected String getRightIdentifierQuoteString() {
-        return "\"";
-    }
-    
     @Override
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
         return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", 
quote(schemaName)));
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
-        String result = super.buildInsertSQL(schemaName, dataRecord);
+    public Optional<String> buildInsertSQLOnDuplicatePart(final String 
schemaName, final DataRecord dataRecord) {
         // TODO without unique key, job has been interrupted, which may lead 
to data duplication
         if (dataRecord.getUniqueKeyValue().isEmpty()) {
-            return result;
+            return Optional.empty();
         }
-        return result + buildConflictSQL(dataRecord);
+        return Optional.of(buildConflictSQL(dataRecord));
     }
     
     // Refer to https://www.postgresql.org/docs/current/sql-insert.html
@@ -92,10 +84,19 @@ public final class PostgreSQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuild
         return result.toString();
     }
     
+    @Override
+    public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+        return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
+    }
+    
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
+        return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;",
+                new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
+    }
+    
+    private String quote(final String item) {
+        return isKeyword(item) ? getType().getQuoteCharacter().wrap(item) : 
item;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index a4f8f8b2f9a..ba4f6f6ef1f 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -25,21 +25,17 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Post
 import org.junit.jupiter.api.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
-import java.util.Optional;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class PostgreSQLPipelineSQLBuilderTest {
     
     private final PostgreSQLPipelineSQLBuilder sqlBuilder = new 
PostgreSQLPipelineSQLBuilder();
     
     @Test
-    void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL("schema1", mockDataRecord());
-        assertThat(actual, is("INSERT INTO 
schema1.t_order(order_id,user_id,status) VALUES(?,?,?) ON CONFLICT (order_id)"
-                + " DO UPDATE SET 
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
+    void assertBuildInsertSQLOnDuplicatePart() {
+        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart("schema1", 
mockDataRecord()).orElse(null);
+        assertThat(actual, is(" ON CONFLICT (order_id) DO UPDATE SET 
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
     }
     
     private DataRecord mockDataRecord() {
@@ -49,21 +45,4 @@ class PostgreSQLPipelineSQLBuilderTest {
         result.addColumn(new Column("status", "ok", true, false));
         return result;
     }
-    
-    @Test
-    void assertQuoteKeyword() {
-        String schemaName = "all";
-        Optional<String> actualCreateSchemaSql = 
sqlBuilder.buildCreateSchemaSQL(schemaName);
-        assertTrue(actualCreateSchemaSql.isPresent());
-        assertThat(actualCreateSchemaSql.get(), is(String.format("CREATE 
SCHEMA IF NOT EXISTS %s", sqlBuilder.quote(schemaName))));
-        String actualDropSQL = sqlBuilder.buildDropSQL(schemaName, "ALL");
-        String expectedDropSQL = String.format("DROP TABLE IF EXISTS %s", 
String.join(".", sqlBuilder.quote(schemaName), sqlBuilder.quote("ALL")));
-        assertThat(actualDropSQL, is(expectedDropSQL));
-    }
-    
-    @Test
-    void assertBuilderDropSQLWithoutKeyword() {
-        String actualDropSQL = sqlBuilder.buildDropSQL("test_normal", 
"t_order");
-        assertThat(actualDropSQL, is("DROP TABLE IF EXISTS 
test_normal.t_order"));
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 23e411f4fba..63c85329450 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -47,11 +47,12 @@ import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineCo
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
@@ -72,8 +73,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -396,14 +395,14 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
-        PipelineSQLBuilder pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
jobConfig.getTargetDatabaseType());
+        PipelineSQLBuilderEngine sqlBuilderEngine = new 
PipelineSQLBuilderEngine(jobConfig.getTargetDatabaseType());
         TableNameSchemaNameMapping mapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
             for (String each : jobConfig.getTargetTableNames()) {
                 String targetSchemaName = mapping.getSchemaName(each);
-                String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, 
each);
+                String sql = sqlBuilderEngine.buildDropSQL(targetSchemaName, 
each);
                 log.info("cleanTempTableOnRollback, targetSchemaName={}, 
targetTableName={}, sql={}", targetSchemaName, each, sql);
                 try (Statement statement = connection.createStatement()) {
                     statement.execute(sql);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
index b3d36544b6d..68b92dd0709 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
@@ -17,25 +17,25 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 
-public final class H2PipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
     
     @Override
-    protected boolean isKeyword(final String item) {
+    public boolean isKeyword(final String item) {
         return false;
     }
     
     @Override
-    protected String getLeftIdentifierQuoteString() {
-        return "";
-    }
-    
-    @Override
-    protected String getRightIdentifierQuoteString() {
-        return "";
+    public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+        return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder

Reply via email to