This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 61f9034d1d8 Refactor PipelineSQLBuilderEngine (#27199)
61f9034d1d8 is described below
commit 61f9034d1d8cb80f825a8f61df8d3ac133c2ce5b
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Jul 14 19:23:33 2023 +0800
Refactor PipelineSQLBuilderEngine (#27199)
* Refactor PipelineSQLBuilderEngine
* Refactor PipelineSQLBuilderEngine
* Refactor PipelineSQLBuilderEngine
---
.../spi/sqlbuilder/DialectPipelineSQLBuilder.java | 2 +-
.../sqlbuilder/PipelineSQLBuilderEngine.java | 35 ++++++++--------------
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 2 +-
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 4 +--
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 2 +-
.../OpenGaussPipelineSQLBuilderTest.java | 6 ++--
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 2 +-
.../PostgreSQLPipelineSQLBuilderTest.java | 2 +-
8 files changed, 23 insertions(+), 32 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
index 0cd993db1b3..cf87727c563 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
@@ -46,7 +46,7 @@ public interface DialectPipelineSQLBuilder extends
DatabaseTypedSPI {
* @param dataRecord data record
* @return on duplicate clause of insert SQL
*/
- default Optional<String> buildInsertSQLOnDuplicateClause(String
schemaName, DataRecord dataRecord) {
+ default Optional<String> buildInsertOnDuplicateClause(String schemaName,
DataRecord dataRecord) {
return Optional.empty();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index 3d9e02b3a46..fd08c5d254e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -78,22 +78,16 @@ public final class PipelineSQLBuilderEngine {
public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(schemaName,
dataRecord.getTableName(), dataRecord.getColumns()));
+ String insertMainClause = buildInsertMainClause(schemaName,
dataRecord.getTableName(), dataRecord.getColumns());
+ sqlCacheMap.put(sqlCacheKey,
dialectSQLBuilder.buildInsertOnDuplicateClause(schemaName,
dataRecord).map(optional -> insertMainClause + " " +
optional).orElse(insertMainClause));
}
- String insertSQL = sqlCacheMap.get(sqlCacheKey);
- return dialectSQLBuilder.buildInsertSQLOnDuplicateClause(schemaName,
dataRecord).map(optional -> insertSQL + " " + optional).orElse(insertSQL);
+ return sqlCacheMap.get(sqlCacheKey);
}
- private String buildInsertSQLInternal(final String schemaName, final
String tableName, final List<Column> columns) {
- StringBuilder columnsLiteral = new StringBuilder();
- StringBuilder holder = new StringBuilder();
- for (Column each : columns) {
- columnsLiteral.append(String.format("%s,",
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
- holder.append("?,");
- }
- columnsLiteral.setLength(columnsLiteral.length() - 1);
- holder.setLength(holder.length() - 1);
- return String.format("INSERT INTO %s(%s) VALUES(%s)",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), columnsLiteral,
holder);
+ private String buildInsertMainClause(final String schemaName, final String
tableName, final List<Column> columns) {
+ String columnsLiteral = columns.stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(","));
+ String valuesLiteral = columns.stream().map(each ->
"?").collect(Collectors.joining(","));
+ return String.format("INSERT INTO %s(%s) VALUES(%s)",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), columnsLiteral,
valuesLiteral);
}
/**
@@ -107,18 +101,15 @@ public final class PipelineSQLBuilderEngine {
public String buildUpdateSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(schemaName,
dataRecord.getTableName(), conditionColumns));
+ String updateMainClause = buildUpdateMainClause(schemaName,
dataRecord.getTableName(), extractUpdatedColumns(dataRecord), conditionColumns);
+ sqlCacheMap.put(sqlCacheKey, updateMainClause);
}
- StringBuilder updatedColumnString = new StringBuilder();
- for (Column each : extractUpdatedColumns(dataRecord)) {
- updatedColumnString.append(String.format("%s = ?,",
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
- }
- updatedColumnString.setLength(updatedColumnString.length() - 1);
- return String.format(sqlCacheMap.get(sqlCacheKey),
updatedColumnString);
+ return sqlCacheMap.get(sqlCacheKey);
}
- private String buildUpdateSQLInternal(final String schemaName, final
String tableName, final Collection<Column> conditionColumns) {
- return String.format("UPDATE %s SET %%s WHERE %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName),
buildWhereSQL(conditionColumns));
+ private String buildUpdateMainClause(final String schemaName, final String
tableName, final Collection<Column> setColumns, final Collection<Column>
conditionColumns) {
+ String updateSetClause = setColumns.stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " =
?").collect(Collectors.joining(","));
+ return String.format("UPDATE %s SET %s WHERE %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName),
updateSetClause, buildWhereSQL(conditionColumns));
}
/**
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index f99330bdfda..21593060770 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -33,7 +33,7 @@ import java.util.Optional;
public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
- public Optional<String> buildInsertSQLOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
+ public Optional<String> buildInsertOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
PipelineSQLSegmentBuilder sqlSegmentBuilder = new
PipelineSQLSegmentBuilder(getType());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 50b725c6ccc..bedd29843f6 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -35,13 +35,13 @@ class MySQLPipelineSQLBuilderTest {
@Test
void assertBuildInsertSQLOnDuplicateClause() {
- String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null,
mockDataRecord("t1")).orElse(null);
+ String actual = sqlBuilder.buildInsertOnDuplicateClause(null,
mockDataRecord("t1")).orElse(null);
assertThat(actual, is("ON DUPLICATE KEY UPDATE
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
}
@Test
void assertBuildInsertSQLOnDuplicateClauseHasShardingColumn() {
- String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null,
mockDataRecord("t2")).orElse(null);
+ String actual = sqlBuilder.buildInsertOnDuplicateClause(null,
mockDataRecord("t2")).orElse(null);
assertThat(actual, is("ON DUPLICATE KEY UPDATE
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index bf4ddb6cfe6..e70c1f42c0d 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -38,7 +38,7 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
}
@Override
- public Optional<String> buildInsertSQLOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
+ public Optional<String> buildInsertOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
PipelineSQLSegmentBuilder sqlSegmentBuilder = new
PipelineSQLSegmentBuilder(getType());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 22283a5e2f3..20112e39018 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -32,12 +32,12 @@ class OpenGaussPipelineSQLBuilderTest {
@Test
void assertBuildInsertSQLOnDuplicatePart() {
- String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null,
mockDataRecord("t1")).orElse(null);
+ String actual = sqlBuilder.buildInsertOnDuplicateClause(null,
mockDataRecord()).orElse(null);
assertThat(actual, is("ON DUPLICATE KEY UPDATE
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
}
- private DataRecord mockDataRecord(final String tableName) {
- DataRecord result = new DataRecord(IngestDataChangeType.INSERT,
tableName, new PlaceholderPosition(), 4);
+ private DataRecord mockDataRecord() {
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, "t1",
new PlaceholderPosition(), 4);
result.addColumn(new Column("id", "", false, true));
result.addColumn(new Column("c0", "", false, false));
result.addColumn(new Column("c1", "", true, false));
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 65c6e954a73..fbc415c4736 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -39,7 +39,7 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
}
@Override
- public Optional<String> buildInsertSQLOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
+ public Optional<String> buildInsertOnDuplicateClause(final String
schemaName, final DataRecord dataRecord) {
// TODO without unique key, job has been interrupted, which may lead
to data duplication
if (dataRecord.getUniqueKeyValue().isEmpty()) {
return Optional.empty();
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 3c55505c0b4..a2d4e99b7e9 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -34,7 +34,7 @@ class PostgreSQLPipelineSQLBuilderTest {
@Test
void assertBuildInsertSQLOnDuplicateClause() {
- String actual = sqlBuilder.buildInsertSQLOnDuplicateClause("schema1",
mockDataRecord()).orElse(null);
+ String actual = sqlBuilder.buildInsertOnDuplicateClause("schema1",
mockDataRecord()).orElse(null);
assertThat(actual, is("ON CONFLICT (order_id) DO UPDATE SET
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
}