This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6b85968da94 Add PipelineSQLBuilderEngine (#27090)
6b85968da94 is described below
commit 6b85968da947762c7f8d425055a42a738ed49af7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 15:02:46 2023 +0800
Add PipelineSQLBuilderEngine (#27090)
* Refactor AbstractPipelineSQLBuilder
* Refactor FixturePipelineSQLBuilder
* Refactor AbstractPipelineSQLBuilder
* Add PipelineSQLBuilderEngine
* Add PipelineSQLBuilderEngine
---
.../spi/sqlbuilder/DialectPipelineSQLBuilder.java | 89 ++++++++++
.../spi/sqlbuilder/PipelineSQLBuilder.java | 192 ---------------------
...LBuilder.java => PipelineSQLBuilderEngine.java} | 190 ++++++++++++++++----
...RC32MatchDataConsistencyCalculateAlgorithm.java | 11 +-
...DataMatchDataConsistencyCalculateAlgorithm.java | 7 +-
.../data/pipeline/core/dumper/InventoryDumper.java | 15 +-
.../core/importer/sink/PipelineDataSourceSink.java | 19 +-
.../preparer/InventoryRecordsCountCalculator.java | 11 +-
.../core/preparer/InventoryTaskSplitter.java | 7 +-
.../datasource/AbstractDataSourcePreparer.java | 7 +-
.../checker/AbstractDataSourceChecker.java | 6 +-
.../sqlbuilder/FixturePipelineSQLBuilder.java | 64 +------
.../common/sqlbuilder/H2PipelineSQLBuilder.java | 20 ++-
...Test.java => PipelineSQLBuilderEngineTest.java} | 41 ++---
...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} | 0
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 39 ++---
...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} | 0
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 21 +--
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 43 ++---
...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} | 0
.../OpenGaussPipelineSQLBuilderTest.java | 26 +--
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 39 +++--
...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} | 0
.../PostgreSQLPipelineSQLBuilderTest.java | 27 +--
.../migration/api/impl/MigrationJobAPI.java | 9 +-
.../core/fixture/H2PipelineSQLBuilder.java | 20 +--
...eline.spi.sqlbuilder.DialectPipelineSQLBuilder} | 0
27 files changed, 402 insertions(+), 501 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
new file mode 100644
index 00000000000..793f35177db
--- /dev/null
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPI;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Dialect pipeline SQL builder.
+ */
+public interface DialectPipelineSQLBuilder extends DatabaseTypedSPI {
+
+ /**
+ * Build create schema SQL.
+ *
+ * @param schemaName schema name
+ * @return create schema SQL
+ */
+ default Optional<String> buildCreateSchemaSQL(String schemaName) {
+ return Optional.empty();
+ }
+
+ /**
+ * Build insert SQL on duplicate part.
+ *
+ * @param schemaName schema name
+ * @param dataRecord data record
+ * @return insert SQL on duplicate part
+ */
+ default Optional<String> buildInsertSQLOnDuplicatePart(String schemaName,
DataRecord dataRecord) {
+ return Optional.empty();
+ }
+
+ /**
+ * Extract updated columns.
+ *
+ * @param dataRecord data record
+ * @return filtered columns
+ */
+ List<Column> extractUpdatedColumns(DataRecord dataRecord);
+
+ /**
+ * Build estimated count SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return estimated count SQL
+ */
+ Optional<String> buildEstimatedCountSQL(String schemaName, String
tableName);
+
+ /**
+ * Build CRC32 SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table Name
+ * @param column column
+ * @return CRC32 SQL
+ */
+ default Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
+ return Optional.empty();
+ }
+
+ /**
+ * Judge whether keyword.
+ *
+ * @param item item to be judged
+ * @return is keyword or not
+ */
+ boolean isKeyword(String item);
+}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
deleted file mode 100644
index 646f1dc907a..00000000000
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPI;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Pipeline SQL builder.
- */
-public interface PipelineSQLBuilder extends DatabaseTypedSPI {
-
- /**
- * Build create schema SQL.
- *
- * @param schemaName schema name
- * @return create schema SQL
- */
- default Optional<String> buildCreateSchemaSQL(String schemaName) {
- return Optional.empty();
- }
-
- /**
- * Build divisible inventory dump SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key
- * @return divisible inventory dump SQL
- */
- String buildDivisibleInventoryDumpSQL(String schemaName, String tableName,
List<String> columnNames, String uniqueKey);
-
- /**
- * Build divisible inventory dump SQL without end value.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key
- * @return divisible inventory dump SQL without end value
- */
- String buildDivisibleInventoryDumpSQLNoEnd(String schemaName, String
tableName, List<String> columnNames, String uniqueKey);
-
- /**
- * Build indivisible inventory dump first SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key
- * @return indivisible inventory dump SQL
- */
- String buildIndivisibleInventoryDumpSQL(String schemaName, String
tableName, List<String> columnNames, String uniqueKey);
-
- /**
- * Build no unique key inventory dump SQL.
- *
- * @param schemaName schema name
- * @param tableName tableName
- * @return inventory dump all SQL
- */
- String buildNoUniqueKeyInventoryDumpSQL(String schemaName, String
tableName);
-
- /**
- * Build insert SQL.
- *
- * @param schemaName schema name
- * @param dataRecord data record
- * @return insert SQL
- */
- String buildInsertSQL(String schemaName, DataRecord dataRecord);
-
- /**
- * Build update SQL.
- *
- * @param schemaName schema name
- * @param dataRecord data record
- * @param conditionColumns condition columns
- * @return update SQL
- */
- String buildUpdateSQL(String schemaName, DataRecord dataRecord,
Collection<Column> conditionColumns);
-
- /**
- * Extract updated columns.
- *
- * @param dataRecord data record
- * @return filtered columns
- */
- // TODO Consider remove extractUpdatedColumns. openGauss has special impl
currently
- List<Column> extractUpdatedColumns(DataRecord dataRecord);
-
- /**
- * Build delete SQL.
- *
- * @param schemaName schema name
- * @param dataRecord data record
- * @param conditionColumns condition columns
- * @return delete SQL
- */
- String buildDeleteSQL(String schemaName, DataRecord dataRecord,
Collection<Column> conditionColumns);
-
- /**
- * Build drop SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return drop SQL
- */
- String buildDropSQL(String schemaName, String tableName);
-
- /**
- * Build count SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return count SQL
- */
- String buildCountSQL(String schemaName, String tableName);
-
- /**
- * Build estimated count SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return estimated count SQL
- */
- Optional<String> buildEstimatedCountSQL(String schemaName, String
tableName);
-
- /**
- * Build unique key minimum maximum values SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param uniqueKey unique key
- * @return min max unique key SQL
- */
- String buildUniqueKeyMinMaxValuesSQL(String schemaName, String tableName,
String uniqueKey);
-
- /**
- * Build query all ordering SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key, it may be primary key, not null
- * @param firstQuery first query
- * @return query SQL
- */
- String buildQueryAllOrderingSQL(String schemaName, String tableName,
List<String> columnNames, String uniqueKey, boolean firstQuery);
-
- /**
- * Build check empty SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return check SQL
- */
- String buildCheckEmptySQL(String schemaName, String tableName);
-
- /**
- * Build CRC32 SQL.
- *
- * @param schemaName schema name
- * @param tableName table Name
- * @param column column
- * @return CRC32 SQL
- */
- default Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
- return Optional.empty();
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/AbstractPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
similarity index 62%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/AbstractPipelineSQLBuilder.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index 0c6d23c5262..48e2f3c699a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/AbstractPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -20,20 +20,21 @@ package
org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
import com.google.common.base.Strings;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
- * Abstract pipeline SQL builder.
+ * Pipeline SQL builder engine.
*/
-public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder
{
+public final class PipelineSQLBuilderEngine {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -43,6 +44,15 @@ public abstract class AbstractPipelineSQLBuilder implements
PipelineSQLBuilder {
private final ConcurrentMap<String, String> sqlCacheMap = new
ConcurrentHashMap<>();
+ private final DatabaseType databaseType;
+
+ private final DialectPipelineSQLBuilder pipelineSQLBuilder;
+
+ public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
+ this.databaseType = databaseType;
+ pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
+ }
+
/**
* Add left and right identifier quote string.
*
@@ -50,26 +60,28 @@ public abstract class AbstractPipelineSQLBuilder implements
PipelineSQLBuilder {
* @return add quote string
*/
public String quote(final String item) {
- return isKeyword(item) ? getLeftIdentifierQuoteString() + item +
getRightIdentifierQuoteString() : item;
+ return pipelineSQLBuilder.isKeyword(item) ?
databaseType.getQuoteCharacter().wrap(item) : item;
}
- protected abstract boolean isKeyword(String item);
-
/**
- * Get left identifier quote string.
+ * Build create schema SQL.
*
- * @return string
+ * @param schemaName schema name
+ * @return create schema SQL
*/
- protected abstract String getLeftIdentifierQuoteString();
+ public Optional<String> buildCreateSchemaSQL(final String schemaName) {
+ return pipelineSQLBuilder.buildCreateSchemaSQL(schemaName);
+ }
/**
- * Get right identifier quote string.
+ * Build divisible inventory dump SQL.
*
- * @return string
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param columnNames column names
+ * @param uniqueKey unique key
+ * @return divisible inventory dump SQL
*/
- protected abstract String getRightIdentifierQuoteString();
-
- @Override
public String buildDivisibleInventoryDumpSQL(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
String quotedUniqueKey = quote(uniqueKey);
@@ -83,42 +95,78 @@ public abstract class AbstractPipelineSQLBuilder implements
PipelineSQLBuilder {
return
columnNames.stream().map(this::quote).collect(Collectors.joining(","));
}
- @Override
- public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
+ /**
+ * Build divisible inventory dump SQL without limited value.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param columnNames column names
+ * @param uniqueKey unique key
+ * @return divisible inventory dump SQL without end value
+ */
+ public String buildNoLimitedDivisibleInventoryDumpSQL(final String
schemaName, final String tableName, final List<String> columnNames, final
String uniqueKey) {
String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
String quotedUniqueKey = quote(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey,
quotedUniqueKey);
}
- @Override
+ /**
+ * Build indivisible inventory dump first SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param columnNames column names
+ * @param uniqueKey unique key
+ * @return indivisible inventory dump SQL
+ */
public String buildIndivisibleInventoryDumpSQL(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
String quotedUniqueKey = quote(uniqueKey);
return String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
}
- @Override
+ /**
+ * Build no unique key inventory dump SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName tableName
+ * @return inventory dump all SQL
+ */
public String buildNoUniqueKeyInventoryDumpSQL(final String schemaName,
final String tableName) {
String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
return String.format("SELECT * FROM %s", qualifiedTableName);
}
- protected final String getQualifiedTableName(final String schemaName,
final String tableName) {
+ /**
+ * Get qualified table name.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return qualified table name
+ */
+ public String getQualifiedTableName(final String schemaName, final String
tableName) {
StringBuilder result = new StringBuilder();
- if (getType().isSchemaAvailable() &&
!Strings.isNullOrEmpty(schemaName)) {
+ if (databaseType.isSchemaAvailable() &&
!Strings.isNullOrEmpty(schemaName)) {
result.append(quote(schemaName)).append('.');
}
result.append(quote(tableName));
return result.toString();
}
- @Override
+ /**
+ * Build insert SQL.
+ *
+ * @param schemaName schema name
+ * @param dataRecord data record
+ * @return insert SQL
+ */
public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(schemaName,
dataRecord.getTableName(), dataRecord.getColumns()));
}
- return sqlCacheMap.get(sqlCacheKey);
+ String insertSQL = sqlCacheMap.get(sqlCacheKey);
+ return pipelineSQLBuilder.buildInsertSQLOnDuplicatePart(schemaName,
dataRecord).map(optional -> insertSQL + " " + optional).orElse(insertSQL);
}
private String buildInsertSQLInternal(final String schemaName, final
String tableName, final List<Column> columns) {
@@ -133,7 +181,14 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
return String.format("INSERT INTO %s(%s) VALUES(%s)",
getQualifiedTableName(schemaName, tableName), columnsLiteral, holder);
}
- @Override
+ /**
+ * Build update SQL.
+ *
+ * @param schemaName schema name
+ * @param dataRecord data record
+ * @param conditionColumns condition columns
+ * @return update SQL
+ */
public String buildUpdateSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -151,12 +206,24 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
return String.format("UPDATE %s SET %%s WHERE %s",
getQualifiedTableName(schemaName, tableName), buildWhereSQL(conditionColumns));
}
- @Override
+ /**
+ * Extract updated columns.
+ *
+ * @param dataRecord data record
+ * @return filtered columns
+ */
public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
- return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
+ return pipelineSQLBuilder.extractUpdatedColumns(dataRecord);
}
- @Override
+ /**
+ * Build delete SQL.
+ *
+ * @param schemaName schema name
+ * @param dataRecord data record
+ * @param conditionColumns condition columns
+ * @return delete SQL
+ */
public String buildDeleteSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -165,7 +232,13 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
return sqlCacheMap.get(sqlCacheKey);
}
- @Override
+ /**
+ * Build drop SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return drop SQL
+ */
public String buildDropSQL(final String schemaName, final String
tableName) {
return String.format("DROP TABLE IF EXISTS %s",
getQualifiedTableName(schemaName, tableName));
}
@@ -183,18 +256,51 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
return where.toString();
}
- @Override
+ /**
+ * Build count SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return count SQL
+ */
public String buildCountSQL(final String schemaName, final String
tableName) {
return String.format("SELECT COUNT(*) FROM %s",
getQualifiedTableName(schemaName, tableName));
}
- @Override
+ /**
+ * Build estimated count SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return estimated count SQL
+ */
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ return pipelineSQLBuilder.buildEstimatedCountSQL(schemaName,
tableName);
+ }
+
+ /**
+ * Build unique key minimum maximum values SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param uniqueKey unique key
+ * @return min max unique key SQL
+ */
public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final
String tableName, final String uniqueKey) {
String quotedUniqueKey = quote(uniqueKey);
return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
}
- @Override
+ /**
+ * Build query all ordering SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param columnNames column names
+ * @param uniqueKey unique key, it may be primary key, not null
+ * @param firstQuery first query
+ * @return query SQL
+ */
public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final List<String> columnNames, final String uniqueKey, final
boolean firstQuery) {
String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
String quotedUniqueKey = quote(uniqueKey);
@@ -203,8 +309,26 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
: String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey,
quotedUniqueKey);
}
- @Override
+ /**
+ * Build check empty SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return check SQL
+ */
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
return String.format("SELECT * FROM %s LIMIT 1",
getQualifiedTableName(schemaName, tableName));
}
+
+ /**
+ * Build CRC32 SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table Name
+ * @param column column
+ * @return CRC32 SQL
+ */
+ public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
+ return pipelineSQLBuilder.buildCRC32SQL(schemaName, tableName, column);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index c4b098ae36a..58b52de71f2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -20,14 +20,13 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -54,13 +53,13 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
@Override
public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter param) {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- PipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
- List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, param,
each)).collect(Collectors.toList());
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(databaseType);
+ List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilderEngine,
param, each)).collect(Collectors.toList());
return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
- private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder,
final DataConsistencyCalculateParameter param, final String columnName) {
- Optional<String> sql = sqlBuilder.buildCRC32SQL(param.getSchemaName(),
param.getLogicTableName(), columnName);
+ private CalculatedItem calculateCRC32(final PipelineSQLBuilderEngine
sqlBuilderEngine, final DataConsistencyCalculateParameter param, final String
columnName) {
+ Optional<String> sql =
sqlBuilderEngine.buildCRC32SQL(param.getSchemaName(),
param.getLogicTableName(), columnName);
ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(param.getDatabaseType()));
try (
Connection connection = param.getDataSource().getConnection();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index f80ae4e6907..929b2d8d494 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
@@ -27,10 +28,8 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Data
import
org.apache.shardingsphere.data.pipeline.core.dumper.ColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -165,9 +164,9 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
throw new UnsupportedOperationException("Data consistency of
DATA_MATCH type not support table without unique key and primary key now");
}
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- PipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(databaseType);
boolean firstQuery = null == param.getTableCheckPosition();
- return sqlBuilder.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), param.getColumnNames(),
param.getUniqueKey().getName(), firstQuery);
+ return
sqlBuilderEngine.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), param.getColumnNames(),
param.getUniqueKey().getName(), firstQuery);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 86745e5e013..6717c6d8a44 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -40,15 +40,14 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPo
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
@@ -77,7 +76,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
private final DataSource dataSource;
- private final PipelineSQLBuilder sqlBuilder;
+ private final PipelineSQLBuilderEngine sqlBuilderEngine;
private final ColumnValueReaderEngine columnValueReaderEngine;
@@ -90,7 +89,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType();
- sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
+ sqlBuilderEngine = new PipelineSQLBuilderEngine(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}
@@ -159,20 +158,20 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
LogicTableName logicTableName = new
LogicTableName(dumperConfig.getLogicTableName());
String schemaName = dumperConfig.getSchemaName(logicTableName);
if (!dumperConfig.hasUniqueKey()) {
- return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName());
+ return
sqlBuilderEngine.buildNoUniqueKeyInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName());
}
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>)
dumperConfig.getPosition();
PipelineColumnMetaData firstColumn =
dumperConfig.getUniqueKeyColumns().get(0);
List<String> columnNames =
dumperConfig.getColumnNameList(logicTableName).orElse(Collections.singletonList("*"));
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) ||
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != position.getBeginValue() && null !=
position.getEndValue()) {
- return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
+ return
sqlBuilderEngine.buildDivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
}
if (null != position.getBeginValue() && null ==
position.getEndValue()) {
- return
sqlBuilder.buildDivisibleInventoryDumpSQLNoEnd(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
+ return
sqlBuilderEngine.buildNoLimitedDivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
}
}
- return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
+ return sqlBuilderEngine.buildIndivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
}
private void setParameters(final PreparedStatement preparedStatement)
throws SQLException {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index f568a7a8f06..46f920cbd04 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -32,12 +32,11 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -65,10 +64,10 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private final PipelineDataSourceManager dataSourceManager;
- private final PipelineSQLBuilder pipelineSqlBuilder;
-
private final JobRateLimitAlgorithm rateLimitAlgorithm;
+ private final PipelineSQLBuilderEngine sqlBuilderEngine;
+
private final AtomicReference<Statement> batchInsertStatement = new
AtomicReference<>();
private final AtomicReference<Statement> updateStatement = new
AtomicReference<>();
@@ -77,9 +76,9 @@ public final class PipelineDataSourceSink implements
PipelineSink {
public PipelineDataSourceSink(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
- rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
this.dataSourceManager = dataSourceManager;
- pipelineSqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
importerConfig.getDataSourceConfig().getDatabaseType());
+ rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
+ sqlBuilderEngine = new
PipelineSQLBuilderEngine(importerConfig.getDataSourceConfig().getDatabaseType());
}
@Override
@@ -202,7 +201,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeBatchInsert(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- String insertSql =
pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()),
dataRecord);
+ String insertSql =
sqlBuilderEngine.buildInsertSQL(getSchemaName(dataRecord.getTableName()),
dataRecord);
try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
@@ -231,8 +230,8 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeUpdate(final Connection connection, final DataRecord
dataRecord) throws SQLException {
Set<String> shardingColumns =
importerConfig.getShardingColumns(dataRecord.getTableName());
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
- List<Column> updatedColumns =
pipelineSqlBuilder.extractUpdatedColumns(dataRecord);
- String updateSql =
pipelineSqlBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
+ List<Column> updatedColumns =
sqlBuilderEngine.extractUpdatedColumns(dataRecord);
+ String updateSql =
sqlBuilderEngine.buildUpdateSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
try (PreparedStatement preparedStatement =
connection.prepareStatement(updateSql)) {
updateStatement.set(preparedStatement);
for (int i = 0; i < updatedColumns.size(); i++) {
@@ -259,7 +258,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeBatchDelete(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- String deleteSQL =
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord,
+ String deleteSQL =
sqlBuilderEngine.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord,
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
batchDeleteStatement.set(preparedStatement);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index a5c25e268ed..5b77b34d244 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -23,9 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -55,15 +54,15 @@ public final class InventoryRecordsCountCalculator {
public static long getTableRecordsCount(final InventoryDumperConfiguration
dumperConfig, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
- PipelineSQLBuilder pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
dataSource.getDatabaseType());
- Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(dataSource.getDatabaseType());
+ Optional<String> sql =
sqlBuilderEngine.buildEstimatedCountSQL(schemaName, actualTableName);
try {
if (sql.isPresent()) {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class,
dataSource.getDatabaseType().getType());
long result = getEstimatedCount(databaseType, dataSource,
sql.get());
- return result > 0 ? result : getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+ return result > 0 ? result : getCount(dataSource,
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
}
- return getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+ return getCount(dataSource,
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
} catch (final SQLException ex) {
String uniqueKey = dumperConfig.hasUniqueKey() ?
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
throw new
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(),
uniqueKey, ex);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 5a0dae236ef..f773649c86b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -38,6 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.StringPrim
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.UnsupportedKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.common.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
@@ -47,8 +48,6 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsum
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -203,8 +202,8 @@ public final class InventoryTaskSplitter {
private Range<Long> getUniqueKeyValuesRange(final
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
final InventoryDumperConfiguration dumperConfig) {
String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- String sql =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
jobItemContext.getJobConfig().getSourceDatabaseType())
- .buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), uniqueKey);
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(jobItemContext.getJobConfig().getSourceDatabaseType());
+ String sql =
sqlBuilderEngine.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
index 7c184645029..fea85711039 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
@@ -24,10 +24,9 @@ import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfigur
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.metadata.generator.PipelineDDLGenerator;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -56,14 +55,14 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
}
CreateTableConfiguration createTableConfig =
param.getCreateTableConfig();
String defaultSchema =
targetDatabaseType.getDefaultSchema().orElse(null);
- PipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, targetDatabaseType);
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableEntry each :
createTableConfig.getCreateTableEntries()) {
String targetSchemaName =
each.getTargetName().getSchemaName().getOriginal();
if (null == targetSchemaName ||
targetSchemaName.equalsIgnoreCase(defaultSchema) ||
createdSchemaNames.contains(targetSchemaName)) {
continue;
}
- Optional<String> sql =
sqlBuilder.buildCreateSchemaSQL(targetSchemaName);
+ Optional<String> sql =
sqlBuilderEngine.buildCreateSchemaSQL(targetSchemaName);
if (sql.isPresent()) {
executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get());
createdSchemaNames.add(targetSchemaName);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
index 0dc919fa28a..f4917bec772 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
@@ -19,12 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
@@ -68,7 +67,8 @@ public abstract class AbstractDataSourceChecker implements
DataSourceChecker {
private boolean checkEmpty(final DataSource dataSource, final String
schemaName, final String tableName) throws SQLException {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, getDatabaseType());
- String sql =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
databaseType).buildCheckEmptySQL(schemaName, tableName);
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(databaseType);
+ String sql = sqlBuilderEngine.buildCheckEmptySQL(schemaName,
tableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
index 58a5f586a30..1177aae74c5 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/FixturePipelineSQLBuilder.java
@@ -19,38 +19,17 @@ package
org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
+public final class FixturePipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
- public String buildDivisibleInventoryDumpSQL(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
- return "";
- }
-
- @Override
- public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
- return "";
- }
-
- @Override
- public String buildIndivisibleInventoryDumpSQL(final String schemaName,
final String tableName, final List<String> columnNames, final String uniqueKey)
{
- return "";
- }
-
- @Override
- public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
- return "";
- }
-
- @Override
- public String buildUpdateSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
- return "";
+ public boolean isKeyword(final String item) {
+ return false;
}
@Override
@@ -58,51 +37,16 @@ public final class FixturePipelineSQLBuilder implements
PipelineSQLBuilder {
return Collections.emptyList();
}
- @Override
- public String buildDeleteSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
- return "";
- }
-
- @Override
- public String buildDropSQL(final String schemaName, final String
tableName) {
- return "";
- }
-
- @Override
- public String buildCountSQL(final String schemaName, final String
tableName) {
- return "";
- }
-
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
return Optional.empty();
}
- @Override
- public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final
String tableName, final String uniqueKey) {
- return "";
- }
-
- @Override
- public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final List<String> columnNames, final String uniqueKey, final
boolean firstQuery) {
- return "";
- }
-
- @Override
- public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return null;
- }
-
@Override
public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
return Optional.of(String.format("SELECT CRC32(%s) FROM %s", column,
tableName));
}
- @Override
- public String buildNoUniqueKeyInventoryDumpSQL(final String schemaName,
final String tableName) {
- return "";
- }
-
@Override
public String getDatabaseType() {
return "FIXTURE";
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
index 7cb22c98bc4..dc4cf156a32 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/H2PipelineSQLBuilder.java
@@ -17,23 +17,25 @@
package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
-public final class H2PipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
@Override
- protected boolean isKeyword(final String item) {
+ public boolean isKeyword(final String item) {
return false;
}
@Override
- protected String getLeftIdentifierQuoteString() {
- return "";
- }
-
- @Override
- protected String getRightIdentifierQuoteString() {
- return "";
+ public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+ return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
}
@Override
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
similarity index 69%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
index 76f1b4b8ab0..bf5568e0da1 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
@@ -22,7 +22,8 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -32,79 +33,79 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class PipelineSQLBuilderTest {
+class PipelineSQLBuilderEngineTest {
- private final PipelineSQLBuilder pipelineSQLBuilder = new
H2PipelineSQLBuilder();
+ private final PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(TypedSPILoader.getService(DatabaseType.class, "H2"));
@Test
void assertBuildDivisibleInventoryDumpSQL() {
- String actual =
pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, "t_order",
Collections.singletonList("*"), "order_id");
+ String actual = sqlBuilderEngine.buildDivisibleInventoryDumpSQL(null,
"t_order", Collections.singletonList("*"), "order_id");
assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? AND
order_id<=? ORDER BY order_id ASC"));
- actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null,
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+ actual = sqlBuilderEngine.buildDivisibleInventoryDumpSQL(null,
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC"));
}
@Test
void assertBuildDivisibleInventoryDumpSQLNoEnd() {
- String actual =
pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, "t_order",
Collections.singletonList("*"), "order_id");
+ String actual =
sqlBuilderEngine.buildNoLimitedDivisibleInventoryDumpSQL(null, "t_order",
Collections.singletonList("*"), "order_id");
assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? ORDER
BY order_id ASC"));
- actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null,
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+ actual =
sqlBuilderEngine.buildNoLimitedDivisibleInventoryDumpSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id");
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? ORDER BY order_id ASC"));
}
@Test
void assertBuildIndivisibleInventoryDumpSQL() {
- String actual =
pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, "t_order",
Collections.singletonList("*"), "order_id");
+ String actual =
sqlBuilderEngine.buildIndivisibleInventoryDumpSQL(null, "t_order",
Collections.singletonList("*"), "order_id");
assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
- actual = pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null,
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+ actual = sqlBuilderEngine.buildIndivisibleInventoryDumpSQL(null,
"t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC"));
}
@Test
void assertBuildQueryAllOrderingSQLFirstQuery() {
- String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", true);
+ String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", true);
assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
- actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
+ actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC"));
}
@Test
void assertBuildQueryAllOrderingSQLNonFirstQuery() {
- String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", false);
+ String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", false);
assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY
order_id ASC"));
- actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
+ actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? ORDER BY order_id ASC"));
}
@Test
void assertBuildInsertSQL() {
- String actual = pipelineSQLBuilder.buildInsertSQL(null,
mockDataRecord("t2"));
+ String actual = sqlBuilderEngine.buildInsertSQL(null,
mockDataRecord("t2"));
assertThat(actual, is("INSERT INTO t2(id,sc,c1,c2,c3)
VALUES(?,?,?,?,?)"));
}
@Test
void assertBuildUpdateSQLWithPrimaryKey() {
- String actual = pipelineSQLBuilder.buildUpdateSQL(null,
mockDataRecord("t2"), RecordUtils.extractPrimaryColumns(mockDataRecord("t2")));
+ String actual = sqlBuilderEngine.buildUpdateSQL(null,
mockDataRecord("t2"), RecordUtils.extractPrimaryColumns(mockDataRecord("t2")));
assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id =
?"));
}
@Test
void assertBuildUpdateSQLWithShardingColumns() {
DataRecord dataRecord = mockDataRecord("t2");
- String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord,
mockConditionColumns(dataRecord));
+ String actual = sqlBuilderEngine.buildUpdateSQL(null, dataRecord,
mockConditionColumns(dataRecord));
assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ?
AND sc = ?"));
}
@Test
void assertBuildDeleteSQLWithPrimaryKey() {
- String actual = pipelineSQLBuilder.buildDeleteSQL(null,
mockDataRecord("t3"), RecordUtils.extractPrimaryColumns(mockDataRecord("t3")));
+ String actual = sqlBuilderEngine.buildDeleteSQL(null,
mockDataRecord("t3"), RecordUtils.extractPrimaryColumns(mockDataRecord("t3")));
assertThat(actual, is("DELETE FROM t3 WHERE id = ?"));
}
@Test
void assertBuildDeleteSQLWithConditionColumns() {
DataRecord dataRecord = mockDataRecord("t3");
- String actual = pipelineSQLBuilder.buildDeleteSQL(null, dataRecord,
mockConditionColumns(dataRecord));
+ String actual = sqlBuilderEngine.buildDeleteSQL(null, dataRecord,
mockConditionColumns(dataRecord));
assertThat(actual, is("DELETE FROM t3 WHERE id = ? AND sc = ?"));
}
@@ -124,7 +125,7 @@ class PipelineSQLBuilderTest {
@Test
void assertBuildDeleteSQLWithoutUniqueKey() {
- String actual = pipelineSQLBuilder.buildDeleteSQL(null,
mockDataRecordWithoutUniqueKey("t_order"),
+ String actual = sqlBuilderEngine.buildDeleteSQL(null,
mockDataRecordWithoutUniqueKey("t_order"),
RecordUtils.extractConditionColumns(mockDataRecordWithoutUniqueKey("t_order"),
Collections.emptySet()));
assertThat(actual, is("DELETE FROM t_order WHERE id = ? AND name =
?"));
}
@@ -132,7 +133,7 @@ class PipelineSQLBuilderTest {
@Test
void assertBuildUpdateSQLWithoutShardingColumns() {
DataRecord dataRecord = mockDataRecordWithoutUniqueKey("t_order");
- String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord,
mockConditionColumns(dataRecord));
+ String actual = sqlBuilderEngine.buildUpdateSQL(null, dataRecord,
mockConditionColumns(dataRecord));
assertThat(actual, is("UPDATE t_order SET name = ? WHERE id = ? AND
name = ?"));
}
diff --git
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index c25ffd99ce6..f0df4901488 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -19,17 +19,21 @@ package
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* MySQL pipeline SQL builder.
*/
-public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder {
private static final Set<String> RESERVED_KEYWORDS = new
HashSet<>(Arrays.asList(
"ADD", "ALL", "ALTER", "ANALYZE", "AND", "AS", "ASC", "BEFORE",
"BETWEEN", "BIGINT", "BINARY", "BLOB", "BOTH", "BY", "CALL",
@@ -50,26 +54,12 @@ public final class MySQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
"WHEN", "WHERE", "WHILE", "WINDOW", "WITH", "WRITE", "XOR",
"YEAR_MONTH", "ZEROFILL"));
@Override
- protected boolean isKeyword(final String item) {
+ public boolean isKeyword(final String item) {
return RESERVED_KEYWORDS.contains(item.toUpperCase());
}
@Override
- public String getLeftIdentifierQuoteString() {
- return "`";
- }
-
- @Override
- public String getRightIdentifierQuoteString() {
- return "`";
- }
-
- @Override
- public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
- return super.buildInsertSQL(schemaName, dataRecord) +
buildDuplicateUpdateSQL(dataRecord);
- }
-
- private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
+ public Optional<String> buildInsertSQLOnDuplicatePart(final String
schemaName, final DataRecord dataRecord) {
StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
@@ -83,7 +73,12 @@ public final class MySQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
}
result.setLength(result.length() - 1);
- return result.toString();
+ return Optional.of(result.toString());
+ }
+
+ @Override
+ public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+ return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
}
@Override
@@ -94,7 +89,11 @@ public final class MySQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
return Optional.of(String.format("SELECT TABLE_ROWS FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
- getQualifiedTableName(schemaName, tableName)));
+ new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName)));
+ }
+
+ private String quote(final String item) {
+ return isKeyword(item) ? getType().getQuoteCharacter().wrap(item) :
item;
}
@Override
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index ec234b30f1a..de50f74f720 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -34,15 +34,15 @@ class MySQLPipelineSQLBuilderTest {
private final MySQLPipelineSQLBuilder sqlBuilder = new
MySQLPipelineSQLBuilder();
@Test
- void assertBuildInsertSQL() {
- String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"));
- assertThat(actual, is("INSERT INTO t1(id,sc,c1,c2,c3)
VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
+ void assertBuildInsertSQLOnDuplicatePart() {
+ String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null,
mockDataRecord("t1")).orElse(null);
+ assertThat(actual, is(" ON DUPLICATE KEY UPDATE
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
}
@Test
- void assertBuildInsertSQLHasShardingColumn() {
- String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t2"));
- assertThat(actual, is("INSERT INTO t2(id,sc,c1,c2,c3)
VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
+ void assertBuildInsertSQLOnDuplicatePartHasShardingColumn() {
+ String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null,
mockDataRecord("t2")).orElse(null);
+ assertThat(actual, is(" ON DUPLICATE KEY UPDATE
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
}
@Test
@@ -62,15 +62,6 @@ class MySQLPipelineSQLBuilderTest {
return result;
}
- @Test
- void assertQuoteKeyword() {
- String tableName = "CASCADE";
- String actualCountSql = sqlBuilder.buildCountSQL(null, tableName);
- assertThat(actualCountSql, is(String.format("SELECT COUNT(*) FROM %s",
sqlBuilder.quote(tableName))));
- actualCountSql = sqlBuilder.buildCountSQL(null,
tableName.toLowerCase());
- assertThat(actualCountSql, is(String.format("SELECT COUNT(*) FROM %s",
sqlBuilder.quote(tableName.toLowerCase()))));
- }
-
@Test
void assertBuilderEstimateCountSQLWithoutKeyword() {
Optional<String> actual = sqlBuilder.buildEstimatedCountSQL(null,
"t_order");
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 4e0fc38ab09..ddf0819d33a 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,7 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
import java.util.Arrays;
import java.util.HashSet;
@@ -31,7 +32,7 @@ import java.util.stream.Collectors;
/**
* Pipeline SQL builder of openGauss.
*/
-public final class OpenGaussPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
+public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuilder {
private static final Set<String> RESERVED_KEYWORDS = new
HashSet<>(Arrays.asList(
"ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC",
"ASYMMETRIC", "AUTHID", "AUTHORIZATION", "BETWEEN", "BIGINT",
@@ -48,36 +49,17 @@ public final class OpenGaussPipelineSQLBuilder extends
AbstractPipelineSQLBuilde
"XMLFOREST", "XMLPARSE", "XMLPI", "XMLROOT", "XMLSERIALIZE"));
@Override
- protected boolean isKeyword(final String item) {
+ public boolean isKeyword(final String item) {
return RESERVED_KEYWORDS.contains(item.toUpperCase());
}
- @Override
- protected String getLeftIdentifierQuoteString() {
- return "\"";
- }
-
- @Override
- protected String getRightIdentifierQuoteString() {
- return "\"";
- }
-
@Override
public Optional<String> buildCreateSchemaSQL(final String schemaName) {
return Optional.of(String.format("CREATE SCHEMA %s",
quote(schemaName)));
}
@Override
- public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
- return super.buildInsertSQL(schemaName, dataRecord) +
buildConflictSQL(dataRecord);
- }
-
- @Override
- public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
- return dataRecord.getColumns().stream().filter(each ->
!(each.isUniqueKey())).collect(Collectors.toList());
- }
-
- private String buildConflictSQL(final DataRecord dataRecord) {
+ public Optional<String> buildInsertSQLOnDuplicatePart(final String
schemaName, final DataRecord dataRecord) {
StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
@@ -87,13 +69,22 @@ public final class OpenGaussPipelineSQLBuilder extends
AbstractPipelineSQLBuilde
result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(',');
}
result.setLength(result.length() - 1);
- return result.toString();
+ return Optional.of(result.toString());
+ }
+
+ @Override
+ public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+ return dataRecord.getColumns().stream().filter(each ->
!(each.isUniqueKey())).collect(Collectors.toList());
}
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
- return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
+ return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;",
+ new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName)));
+ }
+
+ private String quote(final String item) {
+ return isKeyword(item) ? getType().getQuoteCharacter().wrap(item) :
item;
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index eae1df8aa3a..794bb029b4c 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -23,20 +23,17 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeTyp
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.junit.jupiter.api.Test;
-import java.util.Optional;
-
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
class OpenGaussPipelineSQLBuilderTest {
private final OpenGaussPipelineSQLBuilder sqlBuilder = new
OpenGaussPipelineSQLBuilder();
@Test
- void assertBuildInsertSQL() {
- String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"));
- assertThat(actual, is("INSERT INTO t1(id,c0,c1,c2,c3)
VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
+ void assertBuildInsertSQLOnDuplicatePart() {
+ String actual = sqlBuilder.buildInsertSQLOnDuplicatePart(null,
mockDataRecord("t1")).orElse(null);
+ assertThat(actual, is(" ON DUPLICATE KEY UPDATE
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
}
private DataRecord mockDataRecord(final String tableName) {
@@ -48,21 +45,4 @@ class OpenGaussPipelineSQLBuilderTest {
result.addColumn(new Column("c3", "", true, false));
return result;
}
-
- @Test
- void assertQuoteKeyword() {
- String schemaName = "RECYCLEBIN";
- Optional<String> actualCreateSchemaSql =
sqlBuilder.buildCreateSchemaSQL(schemaName);
- assertTrue(actualCreateSchemaSql.isPresent());
- assertThat(actualCreateSchemaSql.get(), is(String.format("CREATE
SCHEMA %s", sqlBuilder.quote(schemaName))));
- String actualDropSQL = sqlBuilder.buildDropSQL(schemaName, "ALL");
- String expectedDropSQL = String.format("DROP TABLE IF EXISTS %s",
String.join(".", sqlBuilder.quote(schemaName), sqlBuilder.quote("ALL")));
- assertThat(actualDropSQL, is(expectedDropSQL));
- }
-
- @Test
- void assertBuilderDropSQLWithoutKeyword() {
- String actualDropSQL = sqlBuilder.buildDropSQL("test_normal",
"t_order");
- assertThat(actualDropSQL, is("DROP TABLE IF EXISTS
test_normal.t_order"));
- }
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 6a24ed29d86..3b3d05c90ad 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -20,17 +20,20 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* PostgreSQL pipeline SQL builder.
*/
-public final class PostgreSQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
+public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder {
private static final Set<String> RESERVED_KEYWORDS = new
HashSet<>(Arrays.asList(
"ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC",
"ASYMMETRIC", "AUTHORIZATION", "BETWEEN", "BIGINT", "BINARY",
@@ -44,33 +47,22 @@ public final class PostgreSQLPipelineSQLBuilder extends
AbstractPipelineSQLBuild
"WINDOW", "WITH", "XMLATTRIBUTES", "XMLCONCAT", "XMLELEMENT",
"XMLEXISTS", "XMLFOREST", "XMLNAMESPACES", "XMLPARSE", "XMLPI", "XMLROOT",
"XMLSERIALIZE", "XMLTABLE"));
@Override
- protected boolean isKeyword(final String item) {
+ public boolean isKeyword(final String item) {
return RESERVED_KEYWORDS.contains(item.toUpperCase());
}
- @Override
- protected String getLeftIdentifierQuoteString() {
- return "\"";
- }
-
- @Override
- protected String getRightIdentifierQuoteString() {
- return "\"";
- }
-
@Override
public Optional<String> buildCreateSchemaSQL(final String schemaName) {
return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s",
quote(schemaName)));
}
@Override
- public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
- String result = super.buildInsertSQL(schemaName, dataRecord);
+ public Optional<String> buildInsertSQLOnDuplicatePart(final String
schemaName, final DataRecord dataRecord) {
// TODO without unique key, job has been interrupted, which may lead
to data duplication
if (dataRecord.getUniqueKeyValue().isEmpty()) {
- return result;
+ return Optional.empty();
}
- return result + buildConflictSQL(dataRecord);
+ return Optional.of(buildConflictSQL(dataRecord));
}
// Refer to https://www.postgresql.org/docs/current/sql-insert.html
@@ -92,10 +84,19 @@ public final class PostgreSQLPipelineSQLBuilder extends
AbstractPipelineSQLBuild
return result.toString();
}
+ @Override
+ public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+ return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
+ }
+
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
- return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
+ return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;",
+ new
PipelineSQLBuilderEngine(getType()).getQualifiedTableName(schemaName,
tableName)));
+ }
+
+ private String quote(final String item) {
+ return isKeyword(item) ? getType().getQuoteCharacter().wrap(item) :
item;
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index a4f8f8b2f9a..ba4f6f6ef1f 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -25,21 +25,17 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Post
import org.junit.jupiter.api.Test;
import org.postgresql.replication.LogSequenceNumber;
-import java.util.Optional;
-
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
class PostgreSQLPipelineSQLBuilderTest {
private final PostgreSQLPipelineSQLBuilder sqlBuilder = new
PostgreSQLPipelineSQLBuilder();
@Test
- void assertBuildInsertSQL() {
- String actual = sqlBuilder.buildInsertSQL("schema1", mockDataRecord());
- assertThat(actual, is("INSERT INTO
schema1.t_order(order_id,user_id,status) VALUES(?,?,?) ON CONFLICT (order_id)"
- + " DO UPDATE SET
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
+ void assertBuildInsertSQLOnDuplicatePart() {
+ String actual = sqlBuilder.buildInsertSQLOnDuplicatePart("schema1",
mockDataRecord()).orElse(null);
+ assertThat(actual, is(" ON CONFLICT (order_id) DO UPDATE SET
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
}
private DataRecord mockDataRecord() {
@@ -49,21 +45,4 @@ class PostgreSQLPipelineSQLBuilderTest {
result.addColumn(new Column("status", "ok", true, false));
return result;
}
-
- @Test
- void assertQuoteKeyword() {
- String schemaName = "all";
- Optional<String> actualCreateSchemaSql =
sqlBuilder.buildCreateSchemaSQL(schemaName);
- assertTrue(actualCreateSchemaSql.isPresent());
- assertThat(actualCreateSchemaSql.get(), is(String.format("CREATE
SCHEMA IF NOT EXISTS %s", sqlBuilder.quote(schemaName))));
- String actualDropSQL = sqlBuilder.buildDropSQL(schemaName, "ALL");
- String expectedDropSQL = String.format("DROP TABLE IF EXISTS %s",
String.join(".", sqlBuilder.quote(schemaName), sqlBuilder.quote("ALL")));
- assertThat(actualDropSQL, is(expectedDropSQL));
- }
-
- @Test
- void assertBuilderDropSQLWithoutKeyword() {
- String actualDropSQL = sqlBuilder.buildDropSQL("test_normal",
"t_order");
- assertThat(actualDropSQL, is("DROP TABLE IF EXISTS
test_normal.t_order"));
- }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 23e411f4fba..63c85329450 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -47,11 +47,12 @@ import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineCo
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
@@ -72,8 +73,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -396,14 +395,14 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
- PipelineSQLBuilder pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
jobConfig.getTargetDatabaseType());
+ PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(jobConfig.getTargetDatabaseType());
TableNameSchemaNameMapping mapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
for (String each : jobConfig.getTargetTableNames()) {
String targetSchemaName = mapping.getSchemaName(each);
- String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName,
each);
+ String sql = sqlBuilderEngine.buildDropSQL(targetSchemaName,
each);
log.info("cleanTempTableOnRollback, targetSchemaName={},
targetTableName={}, sql={}", targetSchemaName, each, sql);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
index b3d36544b6d..68b92dd0709 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
@@ -17,25 +17,25 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.AbstractPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
-public final class H2PipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
@Override
- protected boolean isKeyword(final String item) {
+ public boolean isKeyword(final String item) {
return false;
}
@Override
- protected String getLeftIdentifierQuoteString() {
- return "";
- }
-
- @Override
- protected String getRightIdentifierQuoteString() {
- return "";
+ public List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+ return new ArrayList<>(RecordUtils.extractUpdatedColumns(dataRecord));
}
@Override
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 100%
rename from
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
rename to
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder