This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8813686a3b8 Add DatabaseTypeEngine.escapeIdentifierIfNecessary() 
(#27179)
8813686a3b8 is described below

commit 8813686a3b84a4eb76226602e282acc9d114feba
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 20:01:01 2023 +0800

    Add DatabaseTypeEngine.escapeIdentifierIfNecessary() (#27179)
    
    * Refactor DialectPipelineSQLBuilder
    
    * Add DatabaseTypeEngine.escapeIdentifierIfNecessary()
    
    * Refactor PipelineSQLBuilderEngine
    
    * Refactor DialectPipelineSQLBuilder
---
 .../infra/database/type/DatabaseTypeEngine.java    | 11 +++++
 .../database/type/DatabaseTypeEngineTest.java      | 13 +++++-
 .../fixture/InfraTrunkDatabaseTypeFixture.java     |  5 +++
 .../spi/sqlbuilder/DialectPipelineSQLBuilder.java  |  6 +--
 .../sqlbuilder/PipelineSQLBuilderEngine.java       | 49 +++++++++-------------
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  | 13 +++---
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |  4 +-
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 12 +++---
 .../OpenGaussPipelineSQLBuilderTest.java           |  2 +-
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   | 12 +++---
 .../PostgreSQLPipelineSQLBuilderTest.java          |  2 +-
 11 files changed, 70 insertions(+), 59 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index 65a8f04f8e5..85b8f39ac10 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -201,4 +201,15 @@ public final class DatabaseTypeEngine {
         }
         return result;
     }
+    
+    /**
+     * Escape identifier if necessary.
+     * 
+     * @param databaseType database type
+     * @param identifier identifier to be processed
+     * @return escaped identifier
+     */
+    public static String escapeIdentifierIfNecessary(final DatabaseType 
databaseType, final String identifier) {
+        return databaseType.isReservedWord(identifier) ? 
databaseType.getQuoteCharacter().wrap(identifier) : identifier;
+    }
 }
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
index eefef229cb9..9a66d004187 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
@@ -165,9 +165,18 @@ class DatabaseTypeEngineTest {
     
     @Test
     void assertGetBranchDatabaseTypes() {
-        Collection<String> trunkDatabaseTypes = Collections.singleton(new 
MySQLDatabaseType().getType());
-        Collection<DatabaseType> actual = 
DatabaseTypeEngine.getTrunkAndBranchDatabaseTypes(trunkDatabaseTypes);
+        Collection<DatabaseType> actual = 
DatabaseTypeEngine.getTrunkAndBranchDatabaseTypes(Collections.singleton("MySQL"));
         
assertTrue(actual.contains(TypedSPILoader.getService(DatabaseType.class, 
"MySQL")), "MySQL not present");
         
assertTrue(actual.contains(TypedSPILoader.getService(DatabaseType.class, 
"MariaDB")), "MariaDB not present");
     }
+    
+    @Test
+    void assertEscapeIdentifierIfNecessary() {
+        
assertThat(DatabaseTypeEngine.escapeIdentifierIfNecessary(TypedSPILoader.getService(DatabaseType.class,
 "INFRA.TRUNK.FIXTURE"), "SELECT"), is("`SELECT`"));
+    }
+    
+    @Test
+    void assertEscapeIdentifierIfUnnecessary() {
+        
assertThat(DatabaseTypeEngine.escapeIdentifierIfNecessary(TypedSPILoader.getService(DatabaseType.class,
 "INFRA.TRUNK.FIXTURE"), "INSERT"), is("INSERT"));
+    }
 }
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraTrunkDatabaseTypeFixture.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraTrunkDatabaseTypeFixture.java
index c74c0c5dba6..ac160bedcca 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraTrunkDatabaseTypeFixture.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraTrunkDatabaseTypeFixture.java
@@ -33,6 +33,11 @@ public final class InfraTrunkDatabaseTypeFixture implements 
TrunkDatabaseType {
         return QuoteCharacter.BACK_QUOTE;
     }
     
+    @Override
+    public boolean isReservedWord(final String identifier) {
+        return "SELECT".equalsIgnoreCase(identifier);
+    }
+    
     @Override
     public Collection<String> getJdbcUrlPrefixes() {
         return Collections.singleton("jdbc:infra.fixture:");
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
index d8b112f408f..0cd993db1b3 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
@@ -40,13 +40,13 @@ public interface DialectPipelineSQLBuilder extends 
DatabaseTypedSPI {
     }
     
     /**
-     * Build insert SQL on duplicate part.
+     * Build on duplicate clause of insert SQL.
      *
      * @param schemaName schema name
      * @param dataRecord data record
-     * @return insert SQL on duplicate part
+     * @return on duplicate clause of insert SQL
      */
-    default Optional<String> buildInsertSQLOnDuplicatePart(String schemaName, 
DataRecord dataRecord) {
+    default Optional<String> buildInsertSQLOnDuplicateClause(String 
schemaName, DataRecord dataRecord) {
         return Optional.empty();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index 0dbf806b709..d1b325f678c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import java.util.Collection;
@@ -46,21 +47,11 @@ public final class PipelineSQLBuilderEngine {
     
     private final DatabaseType databaseType;
     
-    private final DialectPipelineSQLBuilder pipelineSQLBuilder;
+    private final DialectPipelineSQLBuilder dialectSQLBuilder;
     
     public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
         this.databaseType = databaseType;
-        pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
-    }
-    
-    /**
-     * Add left and right identifier quote string.
-     *
-     * @param item to add quote item
-     * @return add quote string
-     */
-    public String quote(final String item) {
-        return databaseType.isReservedWord(item) ? 
databaseType.getQuoteCharacter().wrap(item) : item;
+        dialectSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
     }
     
     /**
@@ -70,7 +61,7 @@ public final class PipelineSQLBuilderEngine {
      * @return create schema SQL
      */
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
-        return pipelineSQLBuilder.buildCreateSchemaSQL(schemaName);
+        return dialectSQLBuilder.buildCreateSchemaSQL(schemaName);
     }
     
     /**
@@ -84,7 +75,7 @@ public final class PipelineSQLBuilderEngine {
      */
     public String buildDivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = quote(uniqueKey);
+        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY 
%s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey, quotedUniqueKey);
     }
     
@@ -92,7 +83,7 @@ public final class PipelineSQLBuilderEngine {
         if (columnNames.isEmpty()) {
             return "*";
         }
-        return 
columnNames.stream().map(this::quote).collect(Collectors.joining(","));
+        return columnNames.stream().map(each -> 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, 
each)).collect(Collectors.joining(","));
     }
     
     /**
@@ -106,7 +97,7 @@ public final class PipelineSQLBuilderEngine {
      */
     public String buildNoLimitedDivisibleInventoryDumpSQL(final String 
schemaName, final String tableName, final List<String> columnNames, final 
String uniqueKey) {
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = quote(uniqueKey);
+        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
     }
     
@@ -121,7 +112,7 @@ public final class PipelineSQLBuilderEngine {
      */
     public String buildIndivisibleInventoryDumpSQL(final String schemaName, 
final String tableName, final List<String> columnNames, final String uniqueKey) 
{
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = quote(uniqueKey);
+        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
         return String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
     }
     
@@ -147,9 +138,9 @@ public final class PipelineSQLBuilderEngine {
     public String getQualifiedTableName(final String schemaName, final String 
tableName) {
         StringBuilder result = new StringBuilder();
         if (databaseType.isSchemaAvailable() && 
!Strings.isNullOrEmpty(schemaName)) {
-            result.append(quote(schemaName)).append('.');
+            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, 
schemaName)).append('.');
         }
-        result.append(quote(tableName));
+        
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, 
tableName));
         return result.toString();
     }
     
@@ -166,14 +157,14 @@ public final class PipelineSQLBuilderEngine {
             sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(schemaName, 
dataRecord.getTableName(), dataRecord.getColumns()));
         }
         String insertSQL = sqlCacheMap.get(sqlCacheKey);
-        return pipelineSQLBuilder.buildInsertSQLOnDuplicatePart(schemaName, 
dataRecord).map(optional -> insertSQL + " " + optional).orElse(insertSQL);
+        return dialectSQLBuilder.buildInsertSQLOnDuplicateClause(schemaName, 
dataRecord).map(optional -> insertSQL + " " + optional).orElse(insertSQL);
     }
     
     private String buildInsertSQLInternal(final String schemaName, final 
String tableName, final List<Column> columns) {
         StringBuilder columnsLiteral = new StringBuilder();
         StringBuilder holder = new StringBuilder();
         for (Column each : columns) {
-            columnsLiteral.append(String.format("%s,", quote(each.getName())));
+            columnsLiteral.append(String.format("%s,", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
             holder.append("?,");
         }
         columnsLiteral.setLength(columnsLiteral.length() - 1);
@@ -196,7 +187,7 @@ public final class PipelineSQLBuilderEngine {
         }
         StringBuilder updatedColumnString = new StringBuilder();
         for (Column each : extractUpdatedColumns(dataRecord)) {
-            updatedColumnString.append(String.format("%s = ?,", 
quote(each.getName())));
+            updatedColumnString.append(String.format("%s = ?,", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
         }
         updatedColumnString.setLength(updatedColumnString.length() - 1);
         return String.format(sqlCacheMap.get(sqlCacheKey), 
updatedColumnString);
@@ -213,7 +204,7 @@ public final class PipelineSQLBuilderEngine {
      * @return filtered columns
      */
     public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
-        return pipelineSQLBuilder.extractUpdatedColumns(dataRecord);
+        return dialectSQLBuilder.extractUpdatedColumns(dataRecord);
     }
     
     /**
@@ -250,7 +241,7 @@ public final class PipelineSQLBuilderEngine {
     private String buildWhereSQL(final Collection<Column> conditionColumns) {
         StringBuilder where = new StringBuilder();
         for (Column each : conditionColumns) {
-            where.append(String.format("%s = ? AND ", quote(each.getName())));
+            where.append(String.format("%s = ? AND ", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
         }
         where.setLength(where.length() - 5);
         return where.toString();
@@ -275,7 +266,7 @@ public final class PipelineSQLBuilderEngine {
      * @return estimated count SQL
      */
     public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
-        return pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, 
tableName);
+        return dialectSQLBuilder.buildEstimatedCountSQL(schemaName, tableName);
     }
     
     /**
@@ -287,7 +278,7 @@ public final class PipelineSQLBuilderEngine {
      * @return min max unique key SQL
      */
     public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final 
String tableName, final String uniqueKey) {
-        String quotedUniqueKey = quote(uniqueKey);
+        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
         return String.format("SELECT MIN(%s), MAX(%s) FROM %s", 
quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
     }
     
@@ -303,7 +294,7 @@ public final class PipelineSQLBuilderEngine {
      */
     public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final List<String> columnNames, final String uniqueKey, final 
boolean firstQuery) {
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
-        String quotedUniqueKey = quote(uniqueKey);
+        String quotedUniqueKey = 
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
         return firstQuery
                 ? String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
                 : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, 
quotedUniqueKey);
@@ -317,7 +308,7 @@ public final class PipelineSQLBuilderEngine {
      * @return check SQL
      */
     public String buildCheckEmptySQL(final String schemaName, final String 
tableName) {
-        return pipelineSQLBuilder.buildCheckEmptySQL(schemaName, tableName);
+        return dialectSQLBuilder.buildCheckEmptySQL(schemaName, tableName);
     }
     
     /**
@@ -329,6 +320,6 @@ public final class PipelineSQLBuilderEngine {
      * @return CRC32 SQL
      */
     public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
-        return pipelineSQLBuilder.buildCRC32SQL(schemaName, tableName, column);
+        return dialectSQLBuilder.buildCRC32SQL(schemaName, tableName, column);
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 3564a2b3062..57ee91a4a07 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -33,7 +34,7 @@ import java.util.Optional;
 public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
-    public Optional<String> buildInsertSQLOnDuplicatePart(final String 
schemaName, final DataRecord dataRecord) {
+    public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
@@ -44,7 +45,8 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
             if (column.isUniqueKey()) {
                 continue;
             }
-            
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
+            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column.getName()))
+                    
.append("=VALUES(").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
 column.getName())).append("),");
         }
         result.setLength(result.length() - 1);
         return Optional.of(result.toString());
@@ -62,7 +64,8 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
     
     @Override
     public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
-        return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column), 
quote(tableName)));
+        return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s",
+                DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column), DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), tableName)));
     }
     
     @Override
@@ -71,10 +74,6 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
                 new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
     }
     
-    private String quote(final String item) {
-        return getType().isReservedWord(item) ? 
getType().getQuoteCharacter().wrap(item) : item;
-    }
-    
     @Override
     public String getDatabaseType() {
         return "MySQL";
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index de50f74f720..8e2ede5dc0d 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -35,13 +35,13 @@ class MySQLPipelineSQLBuilderTest {
     
     @Test
     void assertBuildInsertSQLOnDuplicatePart() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null, 
mockDataRecord("t1")).orElse(null);
+        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null, 
mockDataRecord("t1")).orElse(null);
         assertThat(actual, is(" ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
     @Test
     void assertBuildInsertSQLOnDuplicatePartHasShardingColumn() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null, 
mockDataRecord("t2")).orElse(null);
+        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null, 
mockDataRecord("t2")).orElse(null);
         assertThat(actual, is(" ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 41cbfbfcd81..731f2dd1edf 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 
 import java.util.List;
 import java.util.Optional;
@@ -33,18 +34,19 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
     
     @Override
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
-        return Optional.of(String.format("CREATE SCHEMA %s", 
quote(schemaName)));
+        return Optional.of(String.format("CREATE SCHEMA %s", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), schemaName)));
     }
     
     @Override
-    public Optional<String> buildInsertSQLOnDuplicatePart(final String 
schemaName, final DataRecord dataRecord) {
+    public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
             if (column.isUniqueKey()) {
                 continue;
             }
-            
result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(',');
+            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column.getName()))
+                    
.append("=EXCLUDED.").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
 column.getName())).append(',');
         }
         result.setLength(result.length() - 1);
         return Optional.of(result.toString());
@@ -66,10 +68,6 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
                 new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
     }
     
-    private String quote(final String item) {
-        return getType().isReservedWord(item) ? 
getType().getQuoteCharacter().wrap(item) : item;
-    }
-    
     @Override
     public String getDatabaseType() {
         return "openGauss";
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 794bb029b4c..e761c17dc41 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -32,7 +32,7 @@ class OpenGaussPipelineSQLBuilderTest {
     
     @Test
     void assertBuildInsertSQLOnDuplicatePart() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null, 
mockDataRecord("t1")).orElse(null);
+        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null, 
mockDataRecord("t1")).orElse(null);
         assertThat(actual, is(" ON DUPLICATE KEY UPDATE 
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index dc577516e10..adbcc12768f 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -34,11 +35,11 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
     
     @Override
     public Optional<String> buildCreateSchemaSQL(final String schemaName) {
-        return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", 
quote(schemaName)));
+        return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", 
DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), schemaName)));
     }
     
     @Override
-    public Optional<String> buildInsertSQLOnDuplicatePart(final String 
schemaName, final DataRecord dataRecord) {
+    public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         // TODO without unique key, job has been interrupted, which may lead 
to data duplication
         if (dataRecord.getUniqueKeyValue().isEmpty()) {
             return Optional.empty();
@@ -59,7 +60,8 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
             if (column.isUniqueKey()) {
                 continue;
             }
-            
result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(',');
+            
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), 
column.getName()))
+                    
.append("=EXCLUDED.").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
 column.getName())).append(',');
         }
         result.setLength(result.length() - 1);
         return result.toString();
@@ -81,10 +83,6 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
                 new 
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName, 
tableName)));
     }
     
-    private String quote(final String item) {
-        return getType().isReservedWord(item) ? 
getType().getQuoteCharacter().wrap(item) : item;
-    }
-    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index ba4f6f6ef1f..c9dc7af4173 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -34,7 +34,7 @@ class PostgreSQLPipelineSQLBuilderTest {
     
     @Test
     void assertBuildInsertSQLOnDuplicatePart() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicatePart("schema1", 
mockDataRecord()).orElse(null);
+        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause("schema1", 
mockDataRecord()).orElse(null);
         assertThat(actual, is(" ON CONFLICT (order_id) DO UPDATE SET 
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
     }
     

Reply via email to