This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c0cd4ca4dad Split PipelineDataConsistencyCalculateSQLBuilder from
PipelineCommonSQLBuilder (#27229)
c0cd4ca4dad is described below
commit c0cd4ca4dad872e7ca03d59c803c72fb942b6034
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jul 16 13:42:05 2023 +0800
Split PipelineDataConsistencyCalculateSQLBuilder from
PipelineCommonSQLBuilder (#27229)
---
.../sqlbuilder/PipelineCommonSQLBuilder.java | 33 ----------
...ipelineDataConsistencyCalculateSQLBuilder.java} | 77 ++--------------------
...RC32MatchDataConsistencyCalculateAlgorithm.java | 6 +-
...DataMatchDataConsistencyCalculateAlgorithm.java | 4 +-
...ineDataConsistencyCalculateSQLBuilderTest.java} | 4 +-
5 files changed, 12 insertions(+), 112 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
index e161dd45f4d..462c075fa61 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
@@ -21,9 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQL
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
-import java.util.Collection;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Pipeline common SQL builder.
@@ -95,25 +93,6 @@ public final class PipelineCommonSQLBuilder {
return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
escapedUniqueKey, escapedUniqueKey,
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
}
- /**
- * Build query all ordering SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key, it may be primary key, not null
- * @param firstQuery first query
- * @return query SQL
- */
- public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final Collection<String> columnNames, final String uniqueKey,
final boolean firstQuery) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
- String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
- String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
- return firstQuery
- ? String.format("SELECT %s FROM %s ORDER BY %s ASC",
queryColumns, qualifiedTableName, escapedUniqueKey)
- : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
ASC", queryColumns, qualifiedTableName, escapedUniqueKey, escapedUniqueKey);
- }
-
/**
* Build check empty SQL.
*
@@ -124,16 +103,4 @@ public final class PipelineCommonSQLBuilder {
public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
return
dialectSQLBuilder.buildCheckEmptySQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName));
}
-
- /**
- * Build CRC32 SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnName column name
- * @return CRC32 SQL
- */
- public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String columnName) {
- return
dialectSQLBuilder.buildCRC32SQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName), sqlSegmentBuilder.getEscapedIdentifier(columnName));
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
similarity index 54%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
index e161dd45f4d..e086110dfa2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -26,75 +26,19 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * Pipeline common SQL builder.
+ * Pipeline data consistency calculate SQL builder.
*/
-public final class PipelineCommonSQLBuilder {
+public final class PipelineDataConsistencyCalculateSQLBuilder {
private final DialectPipelineSQLBuilder dialectSQLBuilder;
private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
- public PipelineCommonSQLBuilder(final DatabaseType databaseType) {
+ public PipelineDataConsistencyCalculateSQLBuilder(final DatabaseType
databaseType) {
dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
}
- /**
- * Build create schema SQL.
- *
- * @param schemaName schema name
- * @return create schema SQL
- */
- public Optional<String> buildCreateSchemaSQL(final String schemaName) {
- return
dialectSQLBuilder.buildCreateSchemaSQL(sqlSegmentBuilder.getEscapedIdentifier(schemaName));
- }
-
- /**
- * Build drop SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return drop SQL
- */
- public String buildDropSQL(final String schemaName, final String
tableName) {
- return String.format("DROP TABLE IF EXISTS %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
- }
-
- /**
- * Build count SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return count SQL
- */
- public String buildCountSQL(final String schemaName, final String
tableName) {
- return String.format("SELECT COUNT(*) FROM %s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
- }
-
- /**
- * Build estimated count SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return estimated count SQL
- */
- public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- return
dialectSQLBuilder.buildEstimatedCountSQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName));
- }
-
- /**
- * Build unique key minimum maximum values SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param uniqueKey unique key
- * @return min max unique key SQL
- */
- public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final
String tableName, final String uniqueKey) {
- String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
- return String.format("SELECT MIN(%s), MAX(%s) FROM %s",
escapedUniqueKey, escapedUniqueKey,
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName));
- }
-
/**
* Build query all ordering SQL.
*
@@ -103,7 +47,7 @@ public final class PipelineCommonSQLBuilder {
* @param columnNames column names
* @param uniqueKey unique key, it may be primary key, not null
* @param firstQuery first query
- * @return query SQL
+ * @return built SQL
*/
public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final Collection<String> columnNames, final String uniqueKey,
final boolean firstQuery) {
String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
@@ -114,24 +58,13 @@ public final class PipelineCommonSQLBuilder {
: String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
ASC", queryColumns, qualifiedTableName, escapedUniqueKey, escapedUniqueKey);
}
- /**
- * Build check empty SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return check SQL
- */
- public String buildCheckEmptySQL(final String schemaName, final String
tableName) {
- return
dialectSQLBuilder.buildCheckEmptySQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName));
- }
-
/**
* Build CRC32 SQL.
*
* @param schemaName schema name
* @param tableName table name
* @param columnName column name
- * @return CRC32 SQL
+ * @return built SQL
*/
public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String columnName) {
return
dialectSQLBuilder.buildCRC32SQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName), sqlSegmentBuilder.getEscapedIdentifier(columnName));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index ea4da251a6e..59de2c9b31f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -53,12 +53,12 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
@Override
public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter param) {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(databaseType);
+ PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(databaseType);
List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(pipelineSQLBuilder,
param, each)).collect(Collectors.toList());
return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
- private CalculatedItem calculateCRC32(final PipelineCommonSQLBuilder
pipelineSQLBuilder, final DataConsistencyCalculateParameter param, final String
columnName) {
+ private CalculatedItem calculateCRC32(final
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final
DataConsistencyCalculateParameter param, final String columnName) {
Optional<String> sql =
pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(),
param.getLogicTableName(), columnName);
ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(param.getDatabaseType()));
try (
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 90a1a81b8dd..e9763dd6afe 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
@@ -165,7 +165,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
throw new UnsupportedOperationException("Data consistency of
DATA_MATCH type not support table without unique key and primary key now");
}
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(databaseType);
+ PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(databaseType);
Collection<String> columnNames = param.getColumnNames().isEmpty() ?
Collections.singleton("*") : param.getColumnNames();
boolean firstQuery = null == param.getTableCheckPosition();
return
pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), columnNames, param.getUniqueKey().getName(),
firstQuery);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
similarity index 90%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilderTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
index fd46be22965..69cbc899db5 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineCommonSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java
@@ -27,9 +27,9 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class PipelineCommonSQLBuilderTest {
+class PipelineDataConsistencyCalculateSQLBuilderTest {
- private final PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
+ private final PipelineDataConsistencyCalculateSQLBuilder
pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
@Test
void assertBuildQueryAllOrderingSQLFirstQuery() {