This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e0940401b64 Add SQLSegmentBuilder (#27184)
e0940401b64 is described below
commit e0940401b6439062389cc3ece6d5ab602846e203
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 23:41:43 2023 +0800
Add SQLSegmentBuilder (#27184)
* Move DatabaseTypeEngine.getQualifiedTableName()
* Add SQLSegmentBuilder
---
.../infra/database/type/DatabaseTypeEngine.java | 11 ----
.../infra/sqlbuilder/SQLSegmentBuilder.java | 57 ++++++++++++++++++
.../database/type/DatabaseTypeEngineTest.java | 10 ----
.../fixture/InfraBranchDatabaseTypeFixture.java | 13 +++--
.../infra/sqlbuilder/SQLSegmentBuilderTest.java | 52 +++++++++++++++++
.../sqlbuilder/PipelineSQLBuilderEngine.java | 68 +++++++++-------------
.../common/sqlbuilder/H2PipelineSQLBuilder.java | 3 +-
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 14 ++---
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 14 ++---
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 15 +++--
.../core/fixture/H2PipelineSQLBuilder.java | 4 +-
11 files changed, 169 insertions(+), 92 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index 85b8f39ac10..65a8f04f8e5 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -201,15 +201,4 @@ public final class DatabaseTypeEngine {
}
return result;
}
-
- /**
- * Escape identifier if necessary.
- *
- * @param databaseType database type
- * @param identifier identifier to be processed
- * @return escaped identifier
- */
- public static String escapeIdentifierIfNecessary(final DatabaseType
databaseType, final String identifier) {
- return databaseType.isReservedWord(identifier) ?
databaseType.getQuoteCharacter().wrap(identifier) : identifier;
- }
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilder.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilder.java
new file mode 100644
index 00000000000..462fc5c7f4e
--- /dev/null
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilder.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.sqlbuilder;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+
+/**
+ * SQL segment builder.
+ */
+@RequiredArgsConstructor
+public final class SQLSegmentBuilder {
+
+ private final DatabaseType databaseType;
+
+ /**
+ * Get escaped identifier.
+ *
+ * @param identifier identifier to be processed
+ * @return escaped identifier
+ */
+ public String getEscapedIdentifier(final String identifier) {
+ return databaseType.isReservedWord(identifier) ?
databaseType.getQuoteCharacter().wrap(identifier) : identifier;
+ }
+
+ /**
+ * Get qualified table name.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return qualified table name
+ */
+ public String getQualifiedTableName(final String schemaName, final String
tableName) {
+ StringBuilder result = new StringBuilder();
+ if (databaseType.isSchemaAvailable() &&
!Strings.isNullOrEmpty(schemaName)) {
+ result.append(getEscapedIdentifier(schemaName)).append('.');
+ }
+ result.append(getEscapedIdentifier(tableName));
+ return result.toString();
+ }
+}
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
index 9a66d004187..3a409d637ca 100644
---
a/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngineTest.java
@@ -169,14 +169,4 @@ class DatabaseTypeEngineTest {
assertTrue(actual.contains(TypedSPILoader.getService(DatabaseType.class,
"MySQL")), "MySQL not present");
assertTrue(actual.contains(TypedSPILoader.getService(DatabaseType.class,
"MariaDB")), "MariaDB not present");
}
-
- @Test
- void assertEscapeIdentifierIfNecessary() {
-
assertThat(DatabaseTypeEngine.escapeIdentifierIfNecessary(TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE"), "SELECT"), is("`SELECT`"));
- }
-
- @Test
- void assertEscapeIdentifierIfUnnecessary() {
-
assertThat(DatabaseTypeEngine.escapeIdentifierIfNecessary(TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE"), "INSERT"), is("INSERT"));
- }
}
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
index 21b1bbda434..6a4bbd8423b 100644
---
a/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/fixture/InfraBranchDatabaseTypeFixture.java
@@ -56,12 +56,17 @@ public final class InfraBranchDatabaseTypeFixture
implements BranchDatabaseType
}
@Override
- public String getType() {
- return "INFRA.BRANCH.FIXTURE";
+ public DatabaseType getTrunkDatabaseType() {
+ return TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE");
}
@Override
- public DatabaseType getTrunkDatabaseType() {
- return TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE");
+ public boolean isSchemaAvailable() {
+ return true;
+ }
+
+ @Override
+ public String getType() {
+ return "INFRA.BRANCH.FIXTURE";
}
}
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilderTest.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilderTest.java
new file mode 100644
index 00000000000..b45d19679d5
--- /dev/null
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/sqlbuilder/SQLSegmentBuilderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.sqlbuilder;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class SQLSegmentBuilderTest {
+
+ @Test
+ void assertGetEscapedIdentifier() {
+ SQLSegmentBuilder sqlSegmentBuilder = new
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE"));
+ assertThat(sqlSegmentBuilder.getEscapedIdentifier("SELECT"),
is("`SELECT`"));
+ }
+
+ @Test
+ void assertGetUnescapedIdentifier() {
+ SQLSegmentBuilder sqlSegmentBuilder = new
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE"));
+ assertThat(sqlSegmentBuilder.getEscapedIdentifier("INSERT"),
is("INSERT"));
+ }
+
+ @Test
+ void assertGetQualifiedTableNameWithUnsupportedSchema() {
+ SQLSegmentBuilder sqlSegmentBuilder = new
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class,
"INFRA.TRUNK.FIXTURE"));
+ assertThat(sqlSegmentBuilder.getQualifiedTableName("foo_schema",
"foo_tbl"), is("foo_tbl"));
+ }
+
+ @Test
+ void assertGetQualifiedTableNameWithSupportedSchema() {
+ SQLSegmentBuilder sqlSegmentBuilder = new
SQLSegmentBuilder(TypedSPILoader.getService(DatabaseType.class,
"INFRA.BRANCH.FIXTURE"));
+ assertThat(sqlSegmentBuilder.getQualifiedTableName("foo_schema",
"foo_tbl"), is("foo_schema.foo_tbl"));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index d1b325f678c..b8ca191dbb5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -17,13 +17,12 @@
package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
-import com.google.common.base.Strings;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
import java.util.Collection;
import java.util.List;
@@ -43,15 +42,16 @@ public final class PipelineSQLBuilderEngine {
private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
- private final ConcurrentMap<String, String> sqlCacheMap = new
ConcurrentHashMap<>();
+ private final DialectPipelineSQLBuilder dialectSQLBuilder;
- private final DatabaseType databaseType;
+ private final SQLSegmentBuilder sqlSegmentBuilder;
- private final DialectPipelineSQLBuilder dialectSQLBuilder;
+ private final ConcurrentMap<String, String> sqlCacheMap;
public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
- this.databaseType = databaseType;
dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
+ sqlSegmentBuilder = new SQLSegmentBuilder(databaseType);
+ sqlCacheMap = new ConcurrentHashMap<>();
}
/**
@@ -74,8 +74,8 @@ public final class PipelineSQLBuilderEngine {
* @return divisible inventory dump SQL
*/
public String buildDivisibleInventoryDumpSQL(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
- String quotedUniqueKey =
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+ String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY
%s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey,
quotedUniqueKey, quotedUniqueKey);
}
@@ -83,7 +83,7 @@ public final class PipelineSQLBuilderEngine {
if (columnNames.isEmpty()) {
return "*";
}
- return columnNames.stream().map(each ->
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType,
each)).collect(Collectors.joining(","));
+ return
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
}
/**
@@ -96,8 +96,8 @@ public final class PipelineSQLBuilderEngine {
* @return divisible inventory dump SQL without end value
*/
public String buildNoLimitedDivisibleInventoryDumpSQL(final String
schemaName, final String tableName, final List<String> columnNames, final
String uniqueKey) {
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
- String quotedUniqueKey =
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+ String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey,
quotedUniqueKey);
}
@@ -111,8 +111,8 @@ public final class PipelineSQLBuilderEngine {
* @return indivisible inventory dump SQL
*/
public String buildIndivisibleInventoryDumpSQL(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
- String quotedUniqueKey =
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+ String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
}
@@ -124,26 +124,10 @@ public final class PipelineSQLBuilderEngine {
* @return inventory dump all SQL
*/
public String buildNoUniqueKeyInventoryDumpSQL(final String schemaName,
final String tableName) {
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
return String.format("SELECT * FROM %s", qualifiedTableName);
}
- /**
- * Get qualified table name.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return qualified table name
- */
- public String getQualifiedTableName(final String schemaName, final String
tableName) {
- StringBuilder result = new StringBuilder();
- if (databaseType.isSchemaAvailable() &&
!Strings.isNullOrEmpty(schemaName)) {
-
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType,
schemaName)).append('.');
- }
-
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType,
tableName));
- return result.toString();
- }
-
/**
* Build insert SQL.
*
@@ -164,12 +148,12 @@ public final class PipelineSQLBuilderEngine {
StringBuilder columnsLiteral = new StringBuilder();
StringBuilder holder = new StringBuilder();
for (Column each : columns) {
- columnsLiteral.append(String.format("%s,",
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
+ columnsLiteral.append(String.format("%s,",
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
holder.append("?,");
}
columnsLiteral.setLength(columnsLiteral.length() - 1);
holder.setLength(holder.length() - 1);
- return String.format("INSERT INTO %s(%s) VALUES(%s)",
getQualifiedTableName(schemaName, tableName), columnsLiteral, holder);
+ return String.format("INSERT INTO %s(%s) VALUES(%s)",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), columnsLiteral,
holder);
}
/**
@@ -187,14 +171,14 @@ public final class PipelineSQLBuilderEngine {
}
StringBuilder updatedColumnString = new StringBuilder();
for (Column each : extractUpdatedColumns(dataRecord)) {
- updatedColumnString.append(String.format("%s = ?,",
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
+ updatedColumnString.append(String.format("%s = ?,",
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
}
updatedColumnString.setLength(updatedColumnString.length() - 1);
return String.format(sqlCacheMap.get(sqlCacheKey),
updatedColumnString);
}
private String buildUpdateSQLInternal(final String schemaName, final
String tableName, final Collection<Column> conditionColumns) {
- return String.format("UPDATE %s SET %%s WHERE %s",
getQualifiedTableName(schemaName, tableName), buildWhereSQL(conditionColumns));
+ return String.format("UPDATE %s SET %%s WHERE %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName),
buildWhereSQL(conditionColumns));
}
/**
@@ -231,17 +215,17 @@ public final class PipelineSQLBuilderEngine {
* @return drop SQL
*/
public String buildDropSQL(final String schemaName, final String
tableName) {
- return String.format("DROP TABLE IF EXISTS %s",
getQualifiedTableName(schemaName, tableName));
+ return String.format("DROP TABLE IF EXISTS %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
}
private String buildDeleteSQLInternal(final String schemaName, final
String tableName, final Collection<Column> conditionColumns) {
- return String.format("DELETE FROM %s WHERE %s",
getQualifiedTableName(schemaName, tableName), buildWhereSQL(conditionColumns));
+ return String.format("DELETE FROM %s WHERE %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName),
buildWhereSQL(conditionColumns));
}
private String buildWhereSQL(final Collection<Column> conditionColumns) {
StringBuilder where = new StringBuilder();
for (Column each : conditionColumns) {
- where.append(String.format("%s = ? AND ",
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, each.getName())));
+ where.append(String.format("%s = ? AND ",
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
}
where.setLength(where.length() - 5);
return where.toString();
@@ -255,7 +239,7 @@ public final class PipelineSQLBuilderEngine {
* @return count SQL
*/
public String buildCountSQL(final String schemaName, final String
tableName) {
- return String.format("SELECT COUNT(*) FROM %s",
getQualifiedTableName(schemaName, tableName));
+ return String.format("SELECT COUNT(*) FROM %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
}
/**
@@ -278,8 +262,8 @@ public final class PipelineSQLBuilderEngine {
* @return min max unique key SQL
*/
public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final
String tableName, final String uniqueKey) {
- String quotedUniqueKey =
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
- return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
+ String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+ return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
quotedUniqueKey, quotedUniqueKey,
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
}
/**
@@ -293,8 +277,8 @@ public final class PipelineSQLBuilderEngine {
* @return query SQL
*/
public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final List<String> columnNames, final String uniqueKey, final
boolean firstQuery) {
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
- String quotedUniqueKey =
DatabaseTypeEngine.escapeIdentifierIfNecessary(databaseType, uniqueKey);
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+ String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return firstQuery
? String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
: String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey,
quotedUniqueKey);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
index 7ca85d2aa82..827fce59c1b 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +36,7 @@ public final class H2PipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName));
+ return String.format("SELECT * FROM %s LIMIT 1", new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index c3e2a0f107f..205cd75536d 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -20,9 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
import java.util.ArrayList;
import java.util.List;
@@ -36,6 +35,7 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
@Override
public Optional<String> buildInsertSQLOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
+ SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
if (!column.isUpdated()) {
@@ -45,8 +45,7 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
if (column.isUniqueKey()) {
continue;
}
-
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column.getName()))
-
.append("=VALUES(").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column.getName())).append("),");
+
result.append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("=VALUES(").append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("),");
}
result.setLength(result.length() - 1);
return Optional.of(result.toString());
@@ -59,19 +58,20 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName));
+ return String.format("SELECT * FROM %s LIMIT 1", new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
}
@Override
public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
+ SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s",
- DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column), DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), tableName)));
+ sqlSegmentBuilder.getEscapedIdentifier(column),
sqlSegmentBuilder.getEscapedIdentifier(tableName)));
}
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
return Optional.of(String.format("SELECT TABLE_ROWS FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
- new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName)));
+ new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName)));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 1f3f22fd3e3..4a463f8322e 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,9 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
import java.util.List;
import java.util.Optional;
@@ -34,19 +33,20 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
@Override
public Optional<String> buildCreateSchemaSQL(final String schemaName) {
- return Optional.of(String.format("CREATE SCHEMA %s",
DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), schemaName)));
+ SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
+ return Optional.of(String.format("CREATE SCHEMA %s",
sqlSegmentBuilder.getEscapedIdentifier(schemaName)));
}
@Override
public Optional<String> buildInsertSQLOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
+ SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
if (column.isUniqueKey()) {
continue;
}
-
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column.getName()))
-
.append("=EXCLUDED.").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column.getName())).append(',');
+
result.append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("=EXCLUDED.").append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append(',');
}
result.setLength(result.length() - 1);
return Optional.of(result.toString());
@@ -59,13 +59,13 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName));
+ return String.format("SELECT * FROM %s LIMIT 1", new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
}
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;",
- new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName)));
+ new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName)));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index cdf123ac0dc..3a8758019aa 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -20,9 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +34,8 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
@Override
public Optional<String> buildCreateSchemaSQL(final String schemaName) {
- return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s",
DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(), schemaName)));
+ SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
+ return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s",
sqlSegmentBuilder.getEscapedIdentifier(schemaName)));
}
@Override
@@ -55,13 +55,13 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
}
result.setLength(result.length() - 1);
result.append(") DO UPDATE SET ");
+ SQLSegmentBuilder sqlSegmentBuilder = new SQLSegmentBuilder(getType());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
if (column.isUniqueKey()) {
continue;
}
-
result.append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column.getName()))
-
.append("=EXCLUDED.").append(DatabaseTypeEngine.escapeIdentifierIfNecessary(getType(),
column.getName())).append(',');
+
result.append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append("=EXCLUDED.").append(sqlSegmentBuilder.getEscapedIdentifier(column.getName())).append(',');
}
result.setLength(result.length() - 1);
return result.toString();
@@ -74,13 +74,12 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName));
+ return String.format("SELECT * FROM %s LIMIT 1", new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
}
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;",
- new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName)));
+ return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName)));
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
index f7ff57d8b2c..14c8be6b100 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.sqlbuilder.SQLSegmentBuilder;
import java.util.ArrayList;
import java.util.List;
@@ -36,7 +36,7 @@ public final class H2PipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName));
+ return String.format("SELECT * FROM %s LIMIT 1", new
SQLSegmentBuilder(getType()).getQualifiedTableName(schemaName, tableName));
}
@Override