This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e0940401b64 Add SQLSegmentBuilder (#27184)
e0940401b64 is described below

commit e0940401b6439062389cc3ece6d5ab602846e203
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 23:41:43 2023 +0800

    Add SQLSegmentBuilder (#27184)
    
    * Move DatabaseTypeEngine.getQualifiedTableName()
    
    * Add SQLSegmentBuilder
---
 .../infra/database/type/DatabaseTypeEngine.java    | 11 ----
 .../infra/sqlbuilder/SQLSegmentBuilder.java        | 57 ++++++++++++++++++
 .../database/type/DatabaseTypeEngineTest.java      | 10 ----
 .../fixture/InfraBranchDatabaseTypeFixture.java    | 13 +++--
 .../infra/sqlbuilder/SQLSegmentBuilderTest.java    | 52 +++++++++++++++++
 .../sqlbuilder/PipelineSQLBuilderEngine.java       | 68 +++++++++-------------
 .../common/sqlbuilder/H2PipelineSQLBuilder.java    |  3 +-
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  | 14 ++---
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 14 ++---
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   | 15 +++--
 .../core/fixture/H2PipelineSQLBuilder.java         |  4 +-
 11 files changed, 169 insertions(+), 92 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index 85b8f39ac10..65a8f04f8e5 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -201,15 +201,4 @@ public final class DatabaseTypeEngine {
         }
         return result;
     }
-    
-    /**
-     * Escape identifier if necessary.
-     * 
-     * @param databaseType database type
-     * @param identifier identifier to be processed
-     * @return escaped identifier
-     */
-    public static String escapeIdentifierIfNecessary(final DatabaseType 
databaseType, final String identifier) {
-        return databaseType.isReservedWord(identifier) ? 
databaseType.getQuoteCharacter().wrap(identifier) : identifier;
-    }
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilder.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilder.java
new file mode 100644
index 00000000000..462fc5c7f4e
--- /dev/null
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilder.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.sqlbuilder;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+
+/**
+ * SQL segment builder.
+ */
+@RequiredArgsConstructor
+public final class SQLSegmentBuilder {
+    
+    private final DatabaseType databaseType;
+    
+    /**
+     * Get escaped identifier.
+     *
+     * @param identifier identifier to be processed
+     * @return escaped identifier
+     */
+    public String getEscapedIdentifier(final String identifier) {
+        return databaseType.isReservedWord(identifier) ? 
databaseType.getQuoteCharacter().wrap(identifier) : identifier;
+    }
+    
+    /**
+     * Get qualified table name.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return qualified table name
+     */
+    public String getQualifiedTableName(final String schemaName, final String 
tableName) {
+        StringBuilder result = new StringBuilder();
+        if (databaseType.isSchemaAvailable() && 
!Strings.isNullOrEmpty(schemaName)) {
+            result.append(getEscapedIdentifier(schemaName)).append('.');
+        }
+        result.append(getEscapedIdentifier(tableName));
+        return result.toString();
+    }
+}
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
index 9a66d004187..3a409d637ca 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
@@ -169,14 +169,4 @@ class DatabaseTypeEngineTest {
         
assertTrue(actual.contains(TypedSPILoader.getService(DatabaseType.class, 
"MySQL")), "MySQL not present");
         
assertTrue(actual.contains(TypedSPILoader.getService(DatabaseType.class, 
"MariaDB")), "MariaDB not present");
     }
-    
-    @Test
-    void assertEscapeIdentifierIfNecessary() {
-        
assertThat(DatabaseTypeEngine.escapeIdentifierIfNecessary(TypedSPILoader.getService(DatabaseType.class,
 "INFRA.TRUNK.FIXTURE"), "SELECT"), is("`SELECT`"));
-    }
-    
-    @Test
-    void assertEscapeIdentifierIfUnnecessary() {
-        
assertThat(DatabaseTypeEngine.escapeIdentifierIfNecessary(TypedSPILoader.getService(DatabaseType.class,
 "INFRA.TRUNK.FIXTURE"), "INSERT"), is("INSERT"));
-    }
 }
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
index 21b1bbda434..6a4bbd8423b 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
@@ -56,12 +56,17 @@ public final class InfraBranchDatabaseTypeFixture 
implements BranchDatabaseType
     }
     
     @Override
-    public String getType() {
-        return "INFRA.BRANCH.FIXTURE";
+    public DatabaseType getTrunkDatabaseType() {
+        return TypedSPILoader.getService(DatabaseType.class, 
"INFRA.TRUNK.FIXTURE");
     }
     
     @Override
-    public DatabaseType getTrunkDatabaseType() {
-        return TypedSPILoader.getService(DatabaseType.class, 
"INFRA.TRUNK.FIXTURE");
+    public boolean isSchemaAvailable() {
+        return true;
+    }
+    
+    @Override
+    public String getType() {
+        return "INFRA.BRANCH.FIXTURE";
     }
 }
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilderTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilderTest.java
new file mode 100644
index 00000000000..b45d19679d5
--- /dev/null
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.sqlbuilder;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class SQLSegmentBuilderTest {
+    
+    @Test
+    void assertGetEscapedIdentifier() {
+        SQLSegmentBuilder sqlSegmentBuilder = new 
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class, 
"INFRA.TRUNK.FIXTURE"));
+        assertThat(sqlSegmentBuilder.getEscapedIdentifier("SELECT"), 
is("`SELECT`"));
+    }
+    
+    @Test
+    void assertGetUnescapedIdentifier() {
+        SQLSegmentBuilder sqlSegmentBuilder = new 
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class, 
"INFRA.TRUNK.FIXTURE"));
+        assertThat(sqlSegmentBuilder.getEscapedIdentifier("INSERT"), 
is("INSERT"));
+    }
+    
+    @Test
+    void assertGetQualifiedTableNameWithUnsupportedSchema() {
+        SQLSegmentBuilder sqlSegmentBuilder = new 
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class, 
"INFRA.TRUNK.FIXTURE"));
+        assertThat(sqlSegmentBuilder.getQualifiedTableName("foo_schema", 
"foo_tbl"), is("foo_tbl"));
+    }
+    
+    @Test
+    void assertGetQualifiedTableNameWithSupportedSchema() {
+        SQLSegmentBuilder sqlSegmentBuilder = new 
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class, 
"INFRA.BRANCH.FIXTURE"));
+        assertThat(sqlSegmentBuilder.getQualifiedTableName("foo_schema", 
"foo_tbl"), is("foo_schema.foo_tbl"));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index d1b325f678c..b8ca191dbb5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -17,13 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 
-import com.google.common.base.Strings;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
 
 import java.util.Collection;
 import java.util.List;
@@ -43,15 +42,16 @@ public final class PipelineSQLBuilderEngine {
     
     private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
     
-    private final ConcurrentMap<String, String> sqlCacheMap = new 
ConcurrentHashMap<>();
+    private final DialectPipelineSQLBuilder dialectSQLBuilder;
     
-    private final DatabaseType databaseType;
+    private final SQLSegmentBuilder sqlSegmentBuilder;
     
-    private final DialectPipelineSQLBuilder dialectSQLBuilder;
+    private final ConcurrentMap<String, String> sqlCacheMap;
     
     public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
-        this.databaseType = databaseType;
         dialectSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
+        sqlSegmentBuilder = new SQLSegmentBuilder(databaseType);
+        sqlCacheMap = new ConcurrentHashMap<>();
     }
     
     /**
@@ -74,8 +74,8 @@ public final class PipelineSQLBuilderEngine {
      * @return divisible inventory dump SQL
      */
     public String buildDivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY 
%s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey, quotedUniqueKey);
     }
     
@@ -83,7 +83,7 @@ public final class PipelineSQLBuilderEngine {
         if (columnNames.isEmpty()) {
             return "*";
         }
-        return columnNames.stream().map(each -> 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, 
each)).collect(Collectors.joining(","));
+        return 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
     }
     
     /**
@@ -96,8 +96,8 @@ public final class PipelineSQLBuilderEngine {
      * @return divisible inventory dump SQL without end value
      */
     public String buildNoLimitedDivisibleInventoryDumpSQL(final String 
schemaName, final String tableName, final List<String> columnNames, final 
String uniqueKey) {
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
     }
     
@@ -111,8 +111,8 @@ public final class PipelineSQLBuilderEngine {
      * @return indivisible inventory dump SQL
      */
     public String buildIndivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
     }
     
@@ -124,26 +124,10 @@ public final class PipelineSQLBuilderEngine {
      * @return inventory dump all SQL
      */
     public String buildNoUniqueKeyInventoryDumpSQL(final String schemaName, 
final String tableName) {
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
         return String.format("SELECT * FROM %s", qualifiedTableName);
     }
     
-    /**
-     * Get qualified table name.
-     * 
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return qualified table name
-     */
-    public String getQualifiedTableName(final String schemaName, final String 
tableName) {
-        StringBuilder result = new StringBuilder();
-        if (databaseType.isSchemaAvailable() && 
!Strings.isNullOrEmpty(schemaName)) {
-            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, 
schemaName)).append('.');
-        }
-        
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, 
tableName));
-        return result.toString();
-    }
-    
     /**
      * Build insert SQL.
      *
@@ -164,12 +148,12 @@ public final class PipelineSQLBuilderEngine {
         StringBuilder columnsLiteral = new StringBuilder();
         StringBuilder holder = new StringBuilder();
         for (Column each : columns) {
-            columnsLiteral.append(String.format("%s,", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
+            columnsLiteral.append(String.format("%s,", 
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
             holder.append("?,");
         }
         columnsLiteral.setLength(columnsLiteral.length() - 1);
         holder.setLength(holder.length() - 1);
-        return String.format("INSERT INTO %s(%s) VALUES(%s)", 
getQualifiedTableName(schemaName, tableName), columnsLiteral, holder);
+        return String.format("INSERT INTO %s(%s) VALUES(%s)", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), columnsLiteral, 
holder);
     }
     
     /**
@@ -187,14 +171,14 @@ public final class PipelineSQLBuilderEngine {
         }
         StringBuilder updatedColumnString = new StringBuilder();
         for (Column each : extractUpdatedColumns(dataRecord)) {
-            updatedColumnString.append(String.format("%s = ?,", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
+            updatedColumnString.append(String.format("%s = ?,", 
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
         }
         updatedColumnString.setLength(updatedColumnString.length() - 1);
         return String.format(sqlCacheMap.get(sqlCacheKey), 
updatedColumnString);
     }
     
     private String buildUpdateSQLInternal(final String schemaName, final 
String tableName, final Collection<Column> conditionColumns) {
-        return String.format("UPDATE %s SET %%s WHERE %s", 
getQualifiedTableName(schemaName, tableName), buildWhereSQL(conditionColumns));
+        return String.format("UPDATE %s SET %%s WHERE %s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), 
buildWhereSQL(conditionColumns));
     }
     
     /**
@@ -231,17 +215,17 @@ public final class PipelineSQLBuilderEngine {
      * @return drop SQL
      */
     public String buildDropSQL(final String schemaName, final String 
tableName) {
-        return String.format("DROP TABLE IF EXISTS %s", 
getQualifiedTableName(schemaName, tableName));
+        return String.format("DROP TABLE IF EXISTS %s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
     }
     
     private String buildDeleteSQLInternal(final String schemaName, final 
String tableName, final Collection<Column> conditionColumns) {
-        return String.format("DELETE FROM %s WHERE %s", 
getQualifiedTableName(schemaName, tableName), buildWhereSQL(conditionColumns));
+        return String.format("DELETE FROM %s WHERE %s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), 
buildWhereSQL(conditionColumns));
     }
     
     private String buildWhereSQL(final Collection<Column> conditionColumns) {
         StringBuilder where = new StringBuilder();
         for (Column each : conditionColumns) {
-            where.append(String.format("%s = ? AND ", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
+            where.append(String.format("%s = ? AND ", 
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
         }
         where.setLength(where.length() - 5);
         return where.toString();
@@ -255,7 +239,7 @@ public final class PipelineSQLBuilderEngine {
      * @return count SQL
      */
     public String buildCountSQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT COUNT(*) FROM %s", 
getQualifiedTableName(schemaName, tableName));
+        return String.format("SELECT COUNT(*) FROM %s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
     }
     
     /**
@@ -278,8 +262,8 @@ public final class PipelineSQLBuilderEngine {
      * @return min max unique key SQL
      */
     public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final 
String tableName, final String uniqueKey) {
-        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
-        return String.format("SELECT MIN(%s), MAX(%s) FROM %s", 
quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
+        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+        return String.format("SELECT MIN(%s), MAX(%s) FROM %s", 
quotedUniqueKey, quotedUniqueKey, 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
     }
     
     /**
@@ -293,8 +277,8 @@ public final class PipelineSQLBuilderEngine {
      * @return query SQL
      */
     public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final List<String> columnNames, final String uniqueKey, final 
boolean firstQuery) {
-        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return firstQuery
                 ? String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
                 : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
index 7ca85d2aa82..827fce59c1b 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,7 +36,7 @@ public final class H2PipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName));
+        return String.format("SELECT * FROM %s LIMIT 1", new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index c3e2a0f107f..205cd75536d 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -20,9 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +35,7 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
     @Override
     public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
+        SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
             if (!column.isUpdated()) {
@@ -45,8 +45,7 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
             if (column.isUniqueKey()) {
                 continue;
             }
-            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column.getName()))
-                    
.append("=VALUES(").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
 column.getName())).append("),");
+            
result.append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("=VALUES(").append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("),");
         }
         result.setLength(result.length() - 1);
         return Optional.of(result.toString());
@@ -59,19 +58,20 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName));
+        return String.format("SELECT * FROM %s LIMIT 1", new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
     }
     
     @Override
     public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
+        SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
         return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s",
-                DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column), DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), tableName)));
+                sqlSegmentBuilder.getEscapedIdentifier(column), 
sqlSegmentBuilder.getEscapedIdentifier(tableName)));
     }
     
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
         return Optional.of(String.format("SELECT TABLE_ROWS FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
-                new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
+                new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName)));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 1f3f22fd3e3..4a463f8322e 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,9 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
 
 import java.util.List;
 import java.util.Optional;
@@ -34,19 +33,20 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
     
     @Override
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
-        return Optional.of(String.format("CREATE SCHEMA %s", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), schemaName)));
+        SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
+        return Optional.of(String.format("CREATE SCHEMA %s", 
sqlSegmentBuilder.getEscapedIdentifier(schemaName)));
     }
     
     @Override
     public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
+        SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
             if (column.isUniqueKey()) {
                 continue;
             }
-            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column.getName()))
-                    
.append("=EXCLUDED.").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
 column.getName())).append(',');
+            
result.append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("=EXCLUDED.").append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append(',');
         }
         result.setLength(result.length() - 1);
         return Optional.of(result.toString());
@@ -59,13 +59,13 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName));
+        return String.format("SELECT * FROM %s LIMIT 1", new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
     }
     
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
         return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;",
-                new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
+                new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName)));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index cdf123ac0dc..3a8758019aa 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -20,9 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,7 +34,8 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
     
     @Override
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
-        return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), schemaName)));
+        SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
+        return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", 
sqlSegmentBuilder.getEscapedIdentifier(schemaName)));
     }
     
     @Override
@@ -55,13 +55,13 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
         }
         result.setLength(result.length() - 1);
         result.append(") DO UPDATE SET ");
+        SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
             if (column.isUniqueKey()) {
                 continue;
             }
-            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column.getName()))
-                    
.append("=EXCLUDED.").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
 column.getName())).append(',');
+            
result.append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("=EXCLUDED.").append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append(',');
         }
         result.setLength(result.length() - 1);
         return result.toString();
@@ -74,13 +74,12 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName));
+        return String.format("SELECT * FROM %s LIMIT 1", new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
     }
     
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
-        return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;",
-                new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
+        return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;", new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName)));
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
index f7ff57d8b2c..14c8be6b100 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
@@ -20,8 +20,8 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,7 +36,7 @@ public final class H2PipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return String.format("SELECT * FROM %s LIMIT 1", new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName));
+        return String.format("SELECT * FROM %s LIMIT 1", new 
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
     }
     
     @Override


Reply via email to