This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7604bfaa262 Rename CommonPipelineSQLBuilder (#27206)
7604bfaa262 is described below
commit 7604bfaa262728478babfce8633a247766084acd
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jul 15 08:26:17 2023 +0800
Rename CommonPipelineSQLBuilder (#27206)
* Remove PipelineSQLBuilderEngine
* Add PipelineImportSQLBuilder
* Refactor PipelineSQLBuilderEngine
* Refactor PipelineSQLBuilderEngine
* Rename CommonPipelineSQLBuilder
* Remove useless H2PipelineSQLBuilder
* Remove useless H2PipelineSQLBuilder
---
...erEngine.java => CommonPipelineSQLBuilder.java} | 25 +++++--------
...RC32MatchDataConsistencyCalculateAlgorithm.java | 10 +++---
...DataMatchDataConsistencyCalculateAlgorithm.java | 6 ++--
.../data/pipeline/core/dumper/InventoryDumper.java | 6 ++--
.../core/importer/sink/PipelineDataSourceSink.java | 12 +++----
.../preparer/InventoryRecordsCountCalculator.java | 10 +++---
.../core/preparer/InventoryTaskSplitter.java | 6 ++--
.../datasource/AbstractDataSourcePreparer.java | 6 ++--
.../checker/AbstractDataSourceChecker.java | 6 ++--
...Test.java => CommonPipelineSQLBuilderTest.java} | 12 +++----
.../sqlbuilder/PipelineImportSQLBuilderTest.java | 2 +-
.../fixture/FixturePipelineSQLBuilder.java | 2 +-
.../sqlbuilder/fixture/H2PipelineSQLBuilder.java | 41 ----------------------
.../datasource/AbstractDataSourceCheckerTest.java | 2 +-
...peline.spi.sqlbuilder.DialectPipelineSQLBuilder | 1 -
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 12 +++----
.../migration/api/impl/MigrationJobAPI.java | 6 ++--
.../core/fixture/H2PipelineSQLBuilder.java | 3 +-
18 files changed, 57 insertions(+), 111 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilder.java
similarity index 85%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilder.java
index 3031f3ed8ad..f81575f1319 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilder.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
-import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
@@ -27,23 +26,15 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * Pipeline SQL builder engine.
+ * Common pipeline SQL builder.
*/
-public final class PipelineSQLBuilderEngine {
-
- @Getter
- private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
-
- @Getter
- private final PipelineImportSQLBuilder importSQLBuilder;
+public final class CommonPipelineSQLBuilder {
private final DialectPipelineSQLBuilder dialectSQLBuilder;
private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
- public PipelineSQLBuilderEngine(final DatabaseType databaseType) {
- inventoryDumpSQLBuilder = new
PipelineInventoryDumpSQLBuilder(databaseType);
- importSQLBuilder = new PipelineImportSQLBuilder(databaseType);
+ public CommonPipelineSQLBuilder(final DatabaseType databaseType) {
dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
}
@@ -100,8 +91,8 @@ public final class PipelineSQLBuilderEngine {
* @return min max unique key SQL
*/
public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final
String tableName, final String uniqueKey) {
- String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
- return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
quotedUniqueKey, quotedUniqueKey,
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
+ String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+ return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
escapedUniqueKey, escapedUniqueKey,
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
}
/**
@@ -116,10 +107,10 @@ public final class PipelineSQLBuilderEngine {
*/
public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final List<String> columnNames, final String uniqueKey, final
boolean firstQuery) {
String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
- String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+ String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return firstQuery
- ? String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
- : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey,
quotedUniqueKey);
+ ? String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey)
+ : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey,
escapedUniqueKey);
}
private String buildQueryColumns(final List<String> columnNames) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 58b52de71f2..3e578eb487d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -53,13 +53,13 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
@Override
public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter param) {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(databaseType);
- List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilderEngine,
param, each)).collect(Collectors.toList());
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(databaseType);
+ List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(pipelineSQLBuilder,
param, each)).collect(Collectors.toList());
return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
- private CalculatedItem calculateCRC32(final PipelineSQLBuilderEngine
sqlBuilderEngine, final DataConsistencyCalculateParameter param, final String
columnName) {
- Optional<String> sql =
sqlBuilderEngine.buildCRC32SQL(param.getSchemaName(),
param.getLogicTableName(), columnName);
+ private CalculatedItem calculateCRC32(final CommonPipelineSQLBuilder
pipelineSQLBuilder, final DataConsistencyCalculateParameter param, final String
columnName) {
+ Optional<String> sql =
pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(),
param.getLogicTableName(), columnName);
ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(param.getDatabaseType()));
try (
Connection connection = param.getDataSource().getConnection();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 929b2d8d494..ff314ef957d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
@@ -164,9 +164,9 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
throw new UnsupportedOperationException("Data consistency of
DATA_MATCH type not support table without unique key and primary key now");
}
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(databaseType);
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(databaseType);
boolean firstQuery = null == param.getTableCheckPosition();
- return
sqlBuilderEngine.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), param.getColumnNames(),
param.getUniqueKey().getName(), firstQuery);
+ return
pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), param.getColumnNames(),
param.getUniqueKey().getName(), firstQuery);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 124e9be7b9c..fd0d2a35c25 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -41,7 +41,6 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.Placeholde
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineInventoryDumpSQLBuilder;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
@@ -77,7 +76,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
private final DataSource dataSource;
- private final PipelineSQLBuilderEngine sqlBuilderEngine;
+ private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
private final ColumnValueReaderEngine columnValueReaderEngine;
@@ -90,7 +89,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType();
- sqlBuilderEngine = new PipelineSQLBuilderEngine(databaseType);
+ inventoryDumpSQLBuilder = new
PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}
@@ -156,7 +155,6 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
if (!Strings.isNullOrEmpty(dumperConfig.getQuerySQL())) {
return dumperConfig.getQuerySQL();
}
- PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder =
sqlBuilderEngine.getInventoryDumpSQLBuilder();
LogicTableName logicTableName = new
LogicTableName(dumperConfig.getLogicTableName());
String schemaName = dumperConfig.getSchemaName(logicTableName);
if (!dumperConfig.hasUniqueKey()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 9b05f29f83b..d4d8774c790 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -32,7 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
@@ -66,7 +66,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- private final PipelineSQLBuilderEngine sqlBuilderEngine;
+ private final PipelineImportSQLBuilder importSQLBuilder;
private final AtomicReference<Statement> batchInsertStatement = new
AtomicReference<>();
@@ -78,7 +78,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
- sqlBuilderEngine = new
PipelineSQLBuilderEngine(importerConfig.getDataSourceConfig().getDatabaseType());
+ importSQLBuilder = new
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
}
@Override
@@ -201,7 +201,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeBatchInsert(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- String insertSql =
sqlBuilderEngine.getImportSQLBuilder().buildInsertSQL(getSchemaName(dataRecord.getTableName()),
dataRecord);
+ String insertSql =
importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()),
dataRecord);
try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
@@ -231,7 +231,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
Set<String> shardingColumns =
importerConfig.getShardingColumns(dataRecord.getTableName());
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
List<Column> setColumns =
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
- String updateSql =
sqlBuilderEngine.getImportSQLBuilder().buildUpdateSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
+ String updateSql =
importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
try (PreparedStatement preparedStatement =
connection.prepareStatement(updateSql)) {
updateStatement.set(preparedStatement);
for (int i = 0; i < setColumns.size(); i++) {
@@ -258,7 +258,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeBatchDelete(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- String deleteSQL =
sqlBuilderEngine.getImportSQLBuilder().buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord,
+ String deleteSQL =
importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord,
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
batchDeleteStatement.set(preparedStatement);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index 5b77b34d244..597e9c3d173 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -54,15 +54,15 @@ public final class InventoryRecordsCountCalculator {
public static long getTableRecordsCount(final InventoryDumperConfiguration
dumperConfig, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(dataSource.getDatabaseType());
- Optional<String> sql =
sqlBuilderEngine.buildEstimatedCountSQL(schemaName, actualTableName);
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(dataSource.getDatabaseType());
+ Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
try {
if (sql.isPresent()) {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class,
dataSource.getDatabaseType().getType());
long result = getEstimatedCount(databaseType, dataSource,
sql.get());
- return result > 0 ? result : getCount(dataSource,
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
+ return result > 0 ? result : getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
}
- return getCount(dataSource,
sqlBuilderEngine.buildCountSQL(schemaName, actualTableName));
+ return getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
} catch (final SQLException ex) {
String uniqueKey = dumperConfig.hasUniqueKey() ?
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
throw new
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(),
uniqueKey, ex);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 9461e8bfd80..5f5283c8054 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -37,7 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.St
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.common.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
@@ -201,8 +201,8 @@ public final class InventoryTaskSplitter {
private Range<Long> getUniqueKeyValuesRange(final
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
final InventoryDumperConfiguration dumperConfig) {
String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(jobItemContext.getJobConfig().getSourceDatabaseType());
- String sql =
sqlBuilderEngine.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), uniqueKey);
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
+ String sql =
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
index fea85711039..e2aacaf564b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfigur
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.metadata.generator.PipelineDDLGenerator;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
@@ -55,14 +55,14 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
}
CreateTableConfiguration createTableConfig =
param.getCreateTableConfig();
String defaultSchema =
targetDatabaseType.getDefaultSchema().orElse(null);
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(targetDatabaseType);
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableEntry each :
createTableConfig.getCreateTableEntries()) {
String targetSchemaName =
each.getTargetName().getSchemaName().getOriginal();
if (null == targetSchemaName ||
targetSchemaName.equalsIgnoreCase(defaultSchema) ||
createdSchemaNames.contains(targetSchemaName)) {
continue;
}
- Optional<String> sql =
sqlBuilderEngine.buildCreateSchemaSQL(targetSchemaName);
+ Optional<String> sql =
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
if (sql.isPresent()) {
executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get());
createdSchemaNames.add(targetSchemaName);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
index f4917bec772..26f258e296c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
@@ -67,8 +67,8 @@ public abstract class AbstractDataSourceChecker implements
DataSourceChecker {
private boolean checkEmpty(final DataSource dataSource, final String
schemaName, final String tableName) throws SQLException {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, getDatabaseType());
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(databaseType);
- String sql = sqlBuilderEngine.buildCheckEmptySQL(schemaName,
tableName);
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(databaseType);
+ String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName,
tableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilderTest.java
similarity index 70%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilderTest.java
index 4e0ea46efba..0e0903e2c77 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/CommonPipelineSQLBuilderTest.java
@@ -27,23 +27,23 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class PipelineSQLBuilderEngineTest {
+class CommonPipelineSQLBuilderTest {
- private final PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(TypedSPILoader.getService(DatabaseType.class, "H2"));
+ private final CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
@Test
void assertBuildQueryAllOrderingSQLFirstQuery() {
- String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", true);
+ String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", true);
assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
- actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
+ actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", true);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC"));
}
@Test
void assertBuildQueryAllOrderingSQLNonFirstQuery() {
- String actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", false);
+ String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null,
"t_order", Collections.singletonList("*"), "order_id", false);
assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY
order_id ASC"));
- actual = sqlBuilderEngine.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
+ actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id", false);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? ORDER BY order_id ASC"));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
index 4691b91b70d..6a1cc7808fc 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilderTest.java
@@ -34,7 +34,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
class PipelineImportSQLBuilderTest {
- private final PipelineImportSQLBuilder importSQLBuilder = new
PipelineImportSQLBuilder(TypedSPILoader.getService(DatabaseType.class, "H2"));
+ private final PipelineImportSQLBuilder importSQLBuilder = new
PipelineImportSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
@Test
void assertBuildInsertSQL() {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
index 1883a186bc3..2f44ec6f809 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -25,7 +25,7 @@ public final class FixturePipelineSQLBuilder implements
DialectPipelineSQLBuilde
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return null;
+ return String.format("SELECT * FROM %s LIMIT 1", tableName);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/H2PipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/H2PipelineSQLBuilder.java
deleted file mode 100644
index 06219e9d8ab..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/fixture/H2PipelineSQLBuilder.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.sqlbuilder.fixture;
-
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLSegmentBuilder;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
-
-import java.util.Optional;
-
-public final class H2PipelineSQLBuilder implements DialectPipelineSQLBuilder {
-
- @Override
- public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName,
tableName));
- }
-
- @Override
- public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- return Optional.empty();
- }
-
- @Override
- public String getDatabaseType() {
- return "H2";
- }
-}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
index 25209cd1af4..0a912bb59a4 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourceCheckerTest.java
@@ -73,7 +73,7 @@ class AbstractDataSourceCheckerTest {
@Override
public String getDatabaseType() {
- return "H2";
+ return "FIXTURE";
}
};
dataSources = new LinkedList<>();
diff --git
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
index 21e298f9649..896696eea95 100644
---
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
+++
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
@@ -16,4 +16,3 @@
#
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.fixture.FixturePipelineSQLBuilder
-org.apache.shardingsphere.data.pipeline.common.sqlbuilder.fixture.H2PipelineSQLBuilder
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 8f66b06ed90..74069a1af95 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -53,6 +53,12 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName,
tableName));
}
+ @Override
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ return Optional.of(String.format("SELECT TABLE_ROWS FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
+ new
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName,
tableName)));
+ }
+
@Override
public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
PipelineSQLSegmentBuilder sqlSegmentBuilder = new
PipelineSQLSegmentBuilder(getType());
@@ -60,12 +66,6 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
sqlSegmentBuilder.getEscapedIdentifier(column),
sqlSegmentBuilder.getEscapedIdentifier(tableName)));
}
- @Override
- public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- return Optional.of(String.format("SELECT TABLE_ROWS FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
- new
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName,
tableName)));
- }
-
@Override
public String getDatabaseType() {
return "MySQL";
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 63c85329450..f1bb2dc9599 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -52,7 +52,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.CommonPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
@@ -395,14 +395,14 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
- PipelineSQLBuilderEngine sqlBuilderEngine = new
PipelineSQLBuilderEngine(jobConfig.getTargetDatabaseType());
+ CommonPipelineSQLBuilder pipelineSQLBuilder = new
CommonPipelineSQLBuilder(jobConfig.getTargetDatabaseType());
TableNameSchemaNameMapping mapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
for (String each : jobConfig.getTargetTableNames()) {
String targetSchemaName = mapping.getSchemaName(each);
- String sql = sqlBuilderEngine.buildDropSQL(targetSchemaName,
each);
+ String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName,
each);
log.info("cleanTempTableOnRollback, targetSchemaName={},
targetTableName={}, sql={}", targetSchemaName, each, sql);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
index eb048eff7f6..c92ca17c988 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLSegmentBuilder;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
import java.util.Optional;
@@ -26,7 +25,7 @@ public final class H2PipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return String.format("SELECT * FROM %s LIMIT 1", new
PipelineSQLSegmentBuilder(getType()).getQualifiedTableName(schemaName,
tableName));
+ return String.format("SELECT * FROM %s LIMIT 1", tableName);
}
@Override