This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d91ffcb Add DATA_MATCH data consistency checker impl, support chunked
streaming data consistency check, refactor data consitency check SPI definition
and process (#14304)
d91ffcb is described below
commit d91ffcb5a9489e0e02d9a2230f9cdd2cf9e69381
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Dec 25 11:00:32 2021 +0800
Add DATA_MATCH data consistency checker impl, support chunked streaming
data consistency check, refactor data consitency check SPI definition and
process (#14304)
* Refactor SingleTableDataCalculator.getDatabaseTypes to support multiple
databases
* Refactor DEFAULT DataConsistencyCheckAlgorithm to CRC32_MATCH
* Add DATA_MATCH data consistency checker definition
* Refactor CRC32_MATCH impl package
* Add DataMatchSingleTableDataCalculator simple impl
* Reuse data source in checkCount
* Reuse data source in dataCheck
* Support chunked data consistency check
* Support chunked data calculation for DATA_MATCH
* Refactor data consistency checker and calculator method signature
* Prepare chunk-size refactoring
* Extract abstract streaming data calculator
* Refactor data consistency check algorithm properties initialization
* Set up uniqueKey for data consistency check
* Enable rate limit for data consistency check
* Real test
* Enable range query for inline sharding algorithm when data consistency
check
* Add calculate param for extension
* Fix checkstyle
* Unit test
---
.../resources/yaml/encrypt-dataConverters.yaml | 2 +-
.../infra/database/type/DatabaseTypeRegistry.java | 11 ++
.../ShardingSphereJDBCDataSourceCreator.java | 15 ++-
.../check/consistency/DataCalculateParameter.java | 33 ++++--
.../consistency/DataConsistencyCheckResult.java | 16 +--
.../pipeline/core/api/impl/PipelineJobAPIImpl.java | 17 +--
.../check/consistency/DataConsistencyChecker.java | 12 +-
.../consistency/DataConsistencyCheckerImpl.java | 103 ++++++++++-------
.../SingleTableDataCalculatorRegistry.java | 12 +-
.../core/datasource/DataSourceFactory.java | 5 +-
.../core/datasource/DataSourceManager.java | 1 +
.../core/datasource/DataSourceWrapper.java | 5 +
.../ingest/dumper/AbstractInventoryDumper.java | 1 +
.../AbstractDataConsistencyCheckAlgorithm.java} | 37 +++---
.../AbstractSingleTableDataCalculator.java | 26 +++--
...AbstractStreamingSingleTableDataCalculator.java | 101 ++++++++++++++++
.../CRC32MatchDataConsistencyCheckAlgorithm.java} | 26 +----
.../DataMatchDataConsistencyCheckAlgorithm.java | 28 +++--
.../DataMatchSingleTableDataCalculator.java | 128 +++++++++++++++++++++
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 8 ++
...dleRuleAlteredJobCompletionDetectAlgorithm.java | 14 ++-
.../consistency/DataConsistencyCheckAlgorithm.java | 2 +-
.../consistency/SingleTableDataCalculator.java | 36 ++++--
.../spi/sqlbuilder/PipelineSQLBuilder.java | 10 ++
...check.consistency.DataConsistencyCheckAlgorithm | 3 +-
...spi.check.consistency.SingleTableDataCalculator | 2 +-
.../src/main/resources/conf/config-sharding.yaml | 15 ++-
... CRC32MatchMySQLSingleTableDataCalculator.java} | 43 +++----
...spi.check.consistency.SingleTableDataCalculator | 2 +-
...2MatchPostgreSQLSingleTableDataCalculator.java} | 25 ++--
...spi.check.consistency.SingleTableDataCalculator | 2 +-
.../handler/CheckScalingQueryResultSet.java | 10 +-
...owScalingCheckAlgorithmsQueryResultSetTest.java | 22 ++--
.../pipeline/api/impl/PipelineJobAPIImplTest.java | 14 +--
.../DataConsistencyCheckerImplTest.java | 8 +-
.../core/datasource/DataSourceWrapperTest.java | 11 +-
.../FixtureH2SingleTableDataCalculator.java | 15 ++-
...C32MatchDataConsistencyCheckAlgorithmTest.java} | 10 +-
38 files changed, 602 insertions(+), 229 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 63761f4..bd3cb68 100644
---
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -34,6 +34,6 @@ dataConverters:
dataConsistencyChecker:
type: DATA_MATCH
props:
- chunk-size: 10000
+ chunk-size: 1000
checkoutLocker:
type: DEFAULT
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
index 8ef032b..664b34d 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
@@ -21,6 +21,8 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -94,4 +96,13 @@ public final class DatabaseTypeRegistry {
public static DatabaseType getDefaultDatabaseType() {
return DATABASE_TYPES.get(DEFAULT_DATABASE_TYPE);
}
+
+ /**
+ * Get names of all database types.
+ *
+ * @return database type names
+ */
+ public static Collection<String> getDatabaseTypeNames() {
+ return Collections.unmodifiableSet(DATABASE_TYPES.keySet());
+ }
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
index 4e47a1b..71a5f8b 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
@@ -18,10 +18,12 @@
package org.apache.shardingsphere.driver.config.datasource;
import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.ShardingSphereJDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.creator.JDBCDataSourceCreator;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
import javax.sql.DataSource;
@@ -36,8 +38,19 @@ public final class ShardingSphereJDBCDataSourceCreator
implements JDBCDataSource
@Override
public DataSource createDataSource(final Object dataSourceConfig) throws
SQLException {
YamlRootConfiguration rootConfig = (YamlRootConfiguration)
dataSourceConfig;
+ ShardingRuleConfiguration shardingRuleConfig =
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(rootConfig.getRules());
+ enableRangeQueryForInline(shardingRuleConfig);
return
ShardingSphereDataSourceFactory.createDataSource(rootConfig.getSchemaName(),
new
YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources()),
-
Collections.singletonList(ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(rootConfig.getRules())),
null);
+ Collections.singletonList(shardingRuleConfig), null);
+ }
+
+ private void enableRangeQueryForInline(final ShardingRuleConfiguration
shardingRuleConfig) {
+ for (ShardingSphereAlgorithmConfiguration each :
shardingRuleConfig.getShardingAlgorithms().values()) {
+ if (!"INLINE".equalsIgnoreCase(each.getType())) {
+ continue;
+ }
+ each.getProps().put("allow-range-query-with-inline-sharding",
"true");
+ }
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
index 36e7431..ca10b9d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
@@ -21,7 +21,8 @@ import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
import java.util.Collection;
@@ -35,9 +36,10 @@ import java.util.Collection;
public final class DataCalculateParameter {
/**
- * Data source configuration of source side or target side.
+ * Data source of source side or target side.
+ * Do not close it, it will be reused later.
*/
- private JDBCDataSourceConfiguration dataSourceConfig;
+ private DataSourceWrapper dataSource;
private String logicTableName;
@@ -47,27 +49,36 @@ public final class DataCalculateParameter {
private Collection<String> columnNames;
/**
+ * Database type.
+ */
+ private String databaseType;
+
+ /**
* Peer database type.
*/
private String peerDatabaseType;
/**
- * Chunk size of limited records to be calculated in a batch.
+ * It could be primary key.
+ * It could be used in order by clause.
*/
- private Integer chunkSize;
+ private String uniqueKey;
/**
- * Ignored column names.
+ * Used for range query.
+ * If it's configured, then it could be translated to SQL like
"uniqueColumn >= pair.left AND uniqueColumn <= pair.right".
+ * One of left and right of pair could be null.
*/
- private Collection<String> ignoredColumnNames;
+ private Pair<Object, Object> uniqueColumnValueRange;
/**
- * If {@link #chunkSize} exists, it could be used in order by clause on
first priority.
+ * Used for multiple records query.
+ * If it's configured, then it could be translated to SQL like
"uniqueColumn IN (value1,value2,value3)".
*/
- private Collection<String> primaryColumnNames;
+ private Collection<Object> uniqueColumnValues;
/**
- * If {@link #chunkSize} exists, it could be used in order by clause on
second priority.
+ * Previous calculated result will be transferred to next call.
*/
- private Collection<String> uniqueColumnNames;
+ private volatile Object previousCalculatedResult;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 20f13b5..895ecac 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -29,17 +29,17 @@ import lombok.ToString;
@ToString
public final class DataConsistencyCheckResult {
- private final long sourceCount;
+ private final long sourceRecordsCount;
- private final long targetCount;
+ private final long targetRecordsCount;
- private final boolean countValid;
+ private final boolean recordsCountMatched;
- private boolean dataValid;
+ private boolean recordsContentMatched;
- public DataConsistencyCheckResult(final long sourceCount, final long
targetCount) {
- this.sourceCount = sourceCount;
- this.targetCount = targetCount;
- countValid = sourceCount == targetCount;
+ public DataConsistencyCheckResult(final long sourceRecordsCount, final
long targetRecordsCount) {
+ this.sourceRecordsCount = sourceRecordsCount;
+ this.targetRecordsCount = targetRecordsCount;
+ recordsCountMatched = sourceRecordsCount == targetRecordsCount;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
index 5ce51ef..35c4b8f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
@@ -248,10 +248,10 @@ public final class PipelineJobAPIImpl implements
PipelineJobAPI {
private Map<String, DataConsistencyCheckResult>
dataConsistencyCheck0(final RuleAlteredJobContext jobContext, final
DataConsistencyCheckAlgorithm checkAlgorithm) {
long jobId = jobContext.getJobId();
DataConsistencyChecker dataConsistencyChecker =
EnvironmentCheckerFactory.newInstance(jobContext);
- Map<String, DataConsistencyCheckResult> result =
dataConsistencyChecker.countCheck();
- if
(result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
- Map<String, Boolean> dataCheckResult =
dataConsistencyChecker.dataCheck(checkAlgorithm);
- result.forEach((key, value) ->
value.setDataValid(dataCheckResult.getOrDefault(key, false)));
+ Map<String, DataConsistencyCheckResult> result =
dataConsistencyChecker.checkRecordsCount();
+ if
(result.values().stream().allMatch(DataConsistencyCheckResult::isRecordsCountMatched))
{
+ Map<String, Boolean> contentCheckResult =
dataConsistencyChecker.checkRecordsContent(checkAlgorithm);
+ result.forEach((key, value) ->
value.setRecordsContentMatched(contentCheckResult.getOrDefault(key, false)));
}
log.info("Scaling job {} with check algorithm '{}' data consistency
checker result {}", jobId, checkAlgorithm.getClass().getName(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId,
aggregateDataConsistencyCheckResults(jobId, result));
@@ -264,10 +264,11 @@ public final class PipelineJobAPIImpl implements
PipelineJobAPI {
return false;
}
for (Entry<String, DataConsistencyCheckResult> entry :
checkResultMap.entrySet()) {
- boolean isDataValid = entry.getValue().isDataValid();
- boolean isCountValid = entry.getValue().isCountValid();
- if (!isDataValid || !isCountValid) {
- log.error("Scaling job: {}, table: {} data consistency check
failed, dataValid: {}, countValid: {}", jobId, entry.getKey(), isDataValid,
isCountValid);
+ boolean recordsCountMatched =
entry.getValue().isRecordsCountMatched();
+ boolean recordsContentMatched =
entry.getValue().isRecordsContentMatched();
+ if (!recordsContentMatched || !recordsCountMatched) {
+ log.error("Scaling job: {}, table: {} data consistency check
failed, recordsContentMatched: {}, recordsCountMatched: {}",
+ jobId, entry.getKey(), recordsContentMatched,
recordsCountMatched);
return false;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index b0c78f6..ff70369 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -28,17 +28,17 @@ import java.util.Map;
public interface DataConsistencyChecker {
/**
- * Check each table count is valid.
+ * Check whether each table's records count is valid.
*
- * @return count check result. key is logic table name, value is check
result.
+ * @return records count check result. key is logic table name, value is
check result.
*/
- Map<String, DataConsistencyCheckResult> countCheck();
+ Map<String, DataConsistencyCheckResult> checkRecordsCount();
/**
- * Check each table data is valid.
+ * Check whether each table's records content is valid.
*
* @param checkAlgorithm check algorithm
- * @return data is valid or not. key is logic table name, value is check
result.
+ * @return records content check result. key is logic table name, value is
check result.
*/
- Map<String, Boolean> dataCheck(DataConsistencyCheckAlgorithm
checkAlgorithm);
+ Map<String, Boolean> checkRecordsContent(DataConsistencyCheckAlgorithm
checkAlgorithm);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 8d8b944..4a4ae9e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -28,10 +28,13 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.DataCheckFailExcep
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfigurationFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import
org.apache.shardingsphere.infra.metadata.schema.builder.loader.common.TableMetaDataLoader;
+import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import javax.sql.DataSource;
@@ -39,12 +42,13 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -68,33 +72,36 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
private final RuleAlteredJobContext jobContext;
@Override
- public Map<String, DataConsistencyCheckResult> countCheck() {
+ public Map<String, DataConsistencyCheckResult> checkRecordsCount() {
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job"
+ jobContext.getJobId() % 10_000 + "-countCheck-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- try {
+ JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
+ jobContext.getJobConfig().getRuleConfig().getSource().getType(),
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
+ JDBCDataSourceConfiguration targetConfig =
JDBCDataSourceConfigurationFactory.newInstance(
+ jobContext.getJobConfig().getRuleConfig().getTarget().getType(),
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
+ try (DataSourceWrapper sourceDataSource =
dataSourceFactory.newInstance(sourceConfig);
+ DataSourceWrapper targetDataSource =
dataSourceFactory.newInstance(targetConfig)) {
return jobContext.getTaskConfigs()
- .stream().flatMap(each ->
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
- .stream().collect(Collectors.toMap(Function.identity(),
table -> countCheck(table, executor), (oldValue, currentValue) -> oldValue,
LinkedHashMap::new));
+ .stream().flatMap(each ->
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
+ .stream().collect(Collectors.toMap(Function.identity(), table
-> countCheck(table, sourceDataSource, targetDataSource, executor),
+ (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ } catch (final SQLException ex) {
+ throw new DataCheckFailException("count check failed", ex);
} finally {
executor.shutdown();
executor.shutdownNow();
}
}
- private DataConsistencyCheckResult countCheck(final String table, final
ThreadPoolExecutor executor) {
- JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
-
jobContext.getJobConfig().getRuleConfig().getSource().getType(),
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
- JDBCDataSourceConfiguration targetConfig =
JDBCDataSourceConfigurationFactory.newInstance(
-
jobContext.getJobConfig().getRuleConfig().getTarget().getType(),
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
- try (DataSourceWrapper sourceDataSource =
dataSourceFactory.newInstance(sourceConfig);
- DataSourceWrapper targetDataSource =
dataSourceFactory.newInstance(targetConfig)) {
- Future<Long> sourceFuture = executor.submit(() ->
count(sourceDataSource, table, sourceConfig.getDatabaseType()));
- Future<Long> targetFuture = executor.submit(() ->
count(targetDataSource, table, targetConfig.getDatabaseType()));
+ private DataConsistencyCheckResult countCheck(final String table, final
DataSourceWrapper sourceDataSource, final DataSourceWrapper targetDataSource,
final ThreadPoolExecutor executor) {
+ try {
+ Future<Long> sourceFuture = executor.submit(() ->
count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
+ Future<Long> targetFuture = executor.submit(() ->
count(targetDataSource, table, targetDataSource.getDatabaseType()));
long sourceCount = sourceFuture.get();
long targetCount = targetFuture.get();
return new DataConsistencyCheckResult(sourceCount, targetCount);
- } catch (final SQLException | InterruptedException |
ExecutionException ex) {
- throw new DataCheckFailException(String.format("table %s count
check failed.", table), ex);
+ } catch (final InterruptedException | ExecutionException ex) {
+ throw new DataCheckFailException("count check failed, table=" +
table, ex);
}
}
@@ -110,7 +117,7 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
}
@Override
- public Map<String, Boolean> dataCheck(final DataConsistencyCheckAlgorithm
checkAlgorithm) {
+ public Map<String, Boolean> checkRecordsContent(final
DataConsistencyCheckAlgorithm checkAlgorithm) {
Collection<String> supportedDatabaseTypes =
checkAlgorithm.getSupportedDatabaseTypes();
JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
jobContext.getJobConfig().getRuleConfig().getSource().getType(),
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
@@ -119,31 +126,49 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
jobContext.getJobConfig().getRuleConfig().getTarget().getType(),
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes,
targetConfig.getDatabaseType().getName());
Collection<String> logicTableNames =
jobContext.getTaskConfigs().stream().flatMap(each ->
each.getDumperConfig().getTableNameMap().values().stream()).distinct().collect(Collectors.toList());
- Map<String, Collection<String>> tablesColumnNamesMap =
getTablesColumnNamesMap(sourceConfig);
+ Map<String, TableMetaData> tableMetaDataMap =
getTablesColumnsMap(sourceConfig, logicTableNames);
logicTableNames.forEach(each -> {
//TODO put to preparer
- if (!tablesColumnNamesMap.containsKey(each)) {
+ if (!tableMetaDataMap.containsKey(each)) {
throw new DataCheckFailException(String.format("could not get
table columns for '%s'", each));
}
});
- SingleTableDataCalculator sourceCalculator =
checkAlgorithm.getSingleTableDataCalculator(sourceConfig.getDatabaseType().getName());
- SingleTableDataCalculator targetCalculator =
checkAlgorithm.getSingleTableDataCalculator(targetConfig.getDatabaseType().getName());
+ String sourceDatabaseType = sourceConfig.getDatabaseType().getName();
+ String targetDatabaseType = targetConfig.getDatabaseType().getName();
+ SingleTableDataCalculator sourceCalculator =
checkAlgorithm.getSingleTableDataCalculator(sourceDatabaseType);
+ SingleTableDataCalculator targetCalculator =
checkAlgorithm.getSingleTableDataCalculator(targetDatabaseType);
Map<String, Boolean> result = new HashMap<>();
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job"
+ jobContext.getJobId() % 10_000 + "-dataCheck-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- try {
+ JobRateLimitAlgorithm rateLimitAlgorithm =
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
+ try (DataSourceWrapper sourceDataSource =
dataSourceFactory.newInstance(sourceConfig);
+ DataSourceWrapper targetDataSource =
dataSourceFactory.newInstance(targetConfig)) {
for (String each : logicTableNames) {
- Collection<String> columnNames =
tablesColumnNamesMap.get(each);
- DataCalculateParameter sourceCalculateParameter =
DataCalculateParameter.builder().dataSourceConfig(sourceConfig).logicTableName(each).columnNames(columnNames).build();
- Future<Object> sourceFuture = executor.submit(() ->
sourceCalculator.dataCalculate(sourceCalculateParameter));
- DataCalculateParameter targetCalculateParameter =
DataCalculateParameter.builder().dataSourceConfig(targetConfig).logicTableName(each).columnNames(columnNames).build();
- Future<Object> targetFuture = executor.submit(() ->
targetCalculator.dataCalculate(targetCalculateParameter));
- Object sourceCalculateResult = sourceFuture.get();
- Object targetCalculateResult = targetFuture.get();
- boolean calculateResultsEquals =
Objects.equals(sourceCalculateResult, targetCalculateResult);
+ Collection<String> columnNames =
tableMetaDataMap.get(each).getColumns().keySet();
+ String uniqueKey =
tableMetaDataMap.get(each).getPrimaryKeyColumns().get(0);
+ DataCalculateParameter sourceCalculateParameter =
DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType)
+
.logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
+ DataCalculateParameter targetCalculateParameter =
DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType)
+
.logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
+ Iterator<Object> sourceCalculatedResultIterator =
sourceCalculator.calculate(sourceCalculateParameter).iterator();
+ Iterator<Object> targetCalculatedResultIterator =
targetCalculator.calculate(targetCalculateParameter).iterator();
+ boolean calculateResultsEquals = true;
+ while (sourceCalculatedResultIterator.hasNext() &&
targetCalculatedResultIterator.hasNext()) {
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.onQuery();
+ }
+ Future<Object> sourceFuture =
executor.submit(sourceCalculatedResultIterator::next);
+ Future<Object> targetFuture =
executor.submit(targetCalculatedResultIterator::next);
+ Object sourceCalculatedResult = sourceFuture.get();
+ Object targetCalculatedResult = targetFuture.get();
+ calculateResultsEquals =
Objects.equals(sourceCalculatedResult, targetCalculatedResult);
+ if (!calculateResultsEquals) {
+ break;
+ }
+ }
result.put(each, calculateResultsEquals);
}
- } catch (final ExecutionException | InterruptedException ex) {
+ } catch (final ExecutionException | InterruptedException |
SQLException ex) {
throw new DataCheckFailException("data check failed");
} finally {
executor.shutdown();
@@ -158,14 +183,14 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
}
}
- private Map<String, Collection<String>> getTablesColumnNamesMap(final
JDBCDataSourceConfiguration dataSourceConfig) {
- try (DataSourceWrapper dataSource =
dataSourceFactory.newInstance(dataSourceConfig);
- Connection connection = dataSource.getConnection()) {
- Map<String, Collection<String>> result = new LinkedHashMap<>();
- try (ResultSet resultSet =
connection.getMetaData().getColumns(connection.getCatalog(), null, "%", "%")) {
- while (resultSet.next()) {
- result.computeIfAbsent(resultSet.getString("TABLE_NAME"),
tableName -> new ArrayList<>()).add(resultSet.getString("COLUMN_NAME"));
- }
+ // TODO reuse metadata
+ private Map<String, TableMetaData> getTablesColumnsMap(final
JDBCDataSourceConfiguration dataSourceConfig, final Collection<String>
tableNames) {
+ try (DataSourceWrapper dataSource =
dataSourceFactory.newInstance(dataSourceConfig)) {
+ Map<String, TableMetaData> result = new LinkedHashMap<>();
+ for (String each : tableNames) {
+ Optional<TableMetaData> tableMetaDataOptional =
TableMetaDataLoader.load(dataSource, each, dataSourceConfig.getDatabaseType());
+ TableMetaData tableMetaData =
tableMetaDataOptional.orElseThrow(() -> new DataCheckFailException("get table
metadata failed, tableName=" + each));
+ result.put(each, tableMetaData);
}
return result;
} catch (final SQLException ex) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
index 222e7e2..cf0dcc6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
@@ -37,11 +37,13 @@ public final class SingleTableDataCalculatorRegistry {
static {
ShardingSphereServiceLoader.register(SingleTableDataCalculator.class);
for (SingleTableDataCalculator each :
ShardingSphereServiceLoader.getSingletonServiceInstances(SingleTableDataCalculator.class))
{
- SingleTableDataCalculator replaced =
ALGORITHM_DATABASE_CALCULATOR_MAP.computeIfAbsent(each.getAlgorithmType(),
algorithmType -> new HashMap<>())
- .put(each.getDatabaseType(), each);
- if (null != replaced) {
- log.info("element replaced, algorithmType={}, databaseType={},
current={}, replaced={}",
- each.getAlgorithmType(), each.getDatabaseType(),
each.getClass().getName(), replaced.getClass().getName());
+ Map<String, SingleTableDataCalculator> dataCalculatorMap =
ALGORITHM_DATABASE_CALCULATOR_MAP.computeIfAbsent(each.getAlgorithmType(),
algorithmType -> new HashMap<>());
+ for (String databaseType : each.getDatabaseTypes()) {
+ SingleTableDataCalculator replaced =
dataCalculatorMap.put(databaseType, each);
+ if (null != replaced) {
+ log.warn("element replaced, algorithmType={},
databaseTypes={}, current={}, replaced={}",
+ each.getAlgorithmType(), each.getDatabaseTypes(),
each.getClass().getName(), replaced.getClass().getName());
+ }
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
index ac0b209..293e567 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.creator.JDBCDataSourceCreatorFactory;
+import javax.sql.DataSource;
import java.sql.SQLException;
/**
@@ -36,6 +37,8 @@ public final class DataSourceFactory {
*/
@SneakyThrows(SQLException.class)
public DataSourceWrapper newInstance(final JDBCDataSourceConfiguration
dataSourceConfig) {
- return new
DataSourceWrapper(JDBCDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createDataSource(dataSourceConfig.getDataSourceConfiguration()));
+ // TODO cache and reuse, try DataSourceManager
+ DataSource dataSource =
JDBCDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createDataSource(dataSourceConfig.getDataSourceConfiguration());
+ return new DataSourceWrapper(dataSource,
dataSourceConfig.getDatabaseType());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
index a335e38..0bf76a3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
@@ -71,6 +71,7 @@ public final class DataSourceManager implements AutoCloseable
{
* @return data source
*/
public DataSourceWrapper getDataSource(final JDBCDataSourceConfiguration
dataSourceConfig) {
+ // TODO re-init if existing dataSource was closed
if (cachedDataSources.containsKey(dataSourceConfig)) {
return cachedDataSources.get(dataSourceConfig);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
index 47b4d2e..e70326f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
@@ -17,8 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import javax.sql.DataSource;
import java.io.PrintWriter;
@@ -36,6 +38,9 @@ public final class DataSourceWrapper implements DataSource,
AutoCloseable {
private final DataSource dataSource;
+ @Getter
+ private final DatabaseType databaseType;
+
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 44b0deb..ebd453a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -82,6 +82,7 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
private TableMetaData createTableMetaData() {
JDBCDataSourceConfiguration dataSourceConfig =
inventoryDumperConfig.getDataSourceConfig();
+ // TODO share MetaDataManager
MetaDataManager metaDataManager = new
MetaDataManager(dataSourceManager.getDataSource(dataSourceConfig));
return
metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName(),
dataSourceConfig.getDatabaseType());
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
similarity index 54%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
index 51d7794..82de859 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
@@ -15,36 +15,33 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableDataCalculatorRegistry;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.Properties;
/**
- * Scaling default data consistency check algorithm.
+ * Abstract data consistency check algorithm.
*/
-public final class DefaultDataConsistencyCheckAlgorithm implements
DataConsistencyCheckAlgorithm {
+public abstract class AbstractDataConsistencyCheckAlgorithm implements
DataConsistencyCheckAlgorithm {
- public static final String TYPE = "DEFAULT";
-
- private static final Collection<String> SUPPORTED_DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getName());
+ private Properties props = new Properties();
@Override
- public void init() {
+ public Properties getProps() {
+ return props;
}
@Override
- public String getDescription() {
- return "Default implementation with CRC32 of all records.";
+ public void setProps(final Properties props) {
+ this.props = props;
}
@Override
- public Collection<String> getSupportedDatabaseTypes() {
- return SUPPORTED_DATABASE_TYPES;
+ public void init() {
}
@Override
@@ -53,12 +50,10 @@ public final class DefaultDataConsistencyCheckAlgorithm
implements DataConsisten
}
@Override
- public SingleTableDataCalculator getSingleTableDataCalculator(final String
supportedDatabaseType) {
- return SingleTableDataCalculatorRegistry.newServiceInstance(TYPE,
supportedDatabaseType);
- }
-
- @Override
- public String getType() {
- return TYPE;
+ public final SingleTableDataCalculator getSingleTableDataCalculator(final
String supportedDatabaseType) {
+ SingleTableDataCalculator result =
SingleTableDataCalculatorRegistry.newServiceInstance(getType(),
supportedDatabaseType);
+ result.setAlgorithmProps(props);
+ result.init();
+ return result;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/AbstractSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
similarity index 78%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/AbstractSingleTableDataCalculator.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
index 3a7633b..f66a347 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/AbstractSingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
@@ -15,27 +15,39 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
+import java.util.Properties;
+
/**
* Abstract single table data calculator.
*/
-@RequiredArgsConstructor
-@Getter
-@Slf4j
public abstract class AbstractSingleTableDataCalculator implements
SingleTableDataCalculator {
private final DataSourceFactory dataSourceFactory = new
DataSourceFactory();
+ private Properties algorithmProps;
+
protected final DataSourceWrapper getDataSource(final
JDBCDataSourceConfiguration dataSourceConfig) {
return dataSourceFactory.newInstance(dataSourceConfig);
}
+
+ @Override
+ public Properties getAlgorithmProps() {
+ return algorithmProps;
+ }
+
+ @Override
+ public void setAlgorithmProps(final Properties algorithmProps) {
+ this.algorithmProps = algorithmProps;
+ }
+
+ @Override
+ public void init() {
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
new file mode 100644
index 0000000..ba7b031
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Abstract single table data calculator.
+ */
+@RequiredArgsConstructor
+@Getter
+@Slf4j
+public abstract class AbstractStreamingSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
+
+ @Override
+ public final Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
+ return new ResultIterable(dataCalculateParameter);
+ }
+
+ /**
+ * Calculate chunked records at one time.
+ *
+ * @param dataCalculateParameter data calculate parameter
+ * @return optional calculated result, empty means there's no more result
+ */
+ protected abstract Optional<Object> calculateChunk(DataCalculateParameter
dataCalculateParameter);
+
+ /**
+ * It's not thread-safe, it should be executed in only one thread at the
same time.
+ */
+ @RequiredArgsConstructor
+ final class ResultIterable implements Iterable<Object> {
+
+ private final DataCalculateParameter dataCalculateParameter;
+
+ @Override
+ public Iterator<Object> iterator() {
+ return new ResultIterator(dataCalculateParameter);
+ }
+ }
+
+ @RequiredArgsConstructor
+ final class ResultIterator implements Iterator<Object> {
+
+ private final DataCalculateParameter dataCalculateParameter;
+
+ private final AtomicInteger calculationCount = new AtomicInteger(0);
+
+ private volatile Optional<Object> nextResult;
+
+ @Override
+ public boolean hasNext() {
+ calculateIfNecessary();
+ return nextResult.isPresent();
+ }
+
+ @Override
+ public Object next() {
+ calculateIfNecessary();
+ Optional<Object> nextResult = this.nextResult;
+
dataCalculateParameter.setPreviousCalculatedResult(nextResult.orElse(null));
+ this.nextResult = null;
+ return nextResult;
+ }
+
+ private void calculateIfNecessary() {
+ if (null != nextResult) {
+ return;
+ }
+ nextResult = calculateChunk(dataCalculateParameter);
+ if (!nextResult.isPresent()) {
+ log.info("nextResult not present, calculation done.
calculationCount={}", calculationCount);
+ }
+ if (calculationCount.incrementAndGet() % 100_0000 == 0) {
+ log.warn("possible infinite loop, calculationCount={}",
calculationCount);
+ }
+ }
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithm.java
similarity index 60%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithm.java
index 51d7794..e65e7c1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithm.java
@@ -15,31 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import java.util.Collection;
import java.util.Collections;
/**
- * Scaling default data consistency check algorithm.
+ * CRC32 match implementation of data consistency check algorithm.
*/
-public final class DefaultDataConsistencyCheckAlgorithm implements
DataConsistencyCheckAlgorithm {
+public final class CRC32MatchDataConsistencyCheckAlgorithm extends
AbstractDataConsistencyCheckAlgorithm {
- public static final String TYPE = "DEFAULT";
+ public static final String TYPE = "CRC32_MATCH";
private static final Collection<String> SUPPORTED_DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getName());
@Override
- public void init() {
- }
-
- @Override
public String getDescription() {
- return "Default implementation with CRC32 of all records.";
+ return "Match CRC32 of records.";
}
@Override
@@ -48,16 +42,6 @@ public final class DefaultDataConsistencyCheckAlgorithm
implements DataConsisten
}
@Override
- public String getProvider() {
- return "ShardingSphere";
- }
-
- @Override
- public SingleTableDataCalculator getSingleTableDataCalculator(final String
supportedDatabaseType) {
- return SingleTableDataCalculatorRegistry.newServiceInstance(TYPE,
supportedDatabaseType);
- }
-
- @Override
public String getType() {
return TYPE;
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCheckAlgorithm.java
similarity index 53%
copy from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCheckAlgorithm.java
index 0c4792d..d1207de 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCheckAlgorithm.java
@@ -15,25 +15,33 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-public final class FixtureH2SingleTableDataCalculator implements
SingleTableDataCalculator {
+import java.util.Collection;
+
+/**
+ * Data match implementation of data consistency check algorithm.
+ */
+public final class DataMatchDataConsistencyCheckAlgorithm extends
AbstractDataConsistencyCheckAlgorithm {
+
+ public static final String TYPE = "DATA_MATCH";
+
+ private static final Collection<String> SUPPORTED_DATABASE_TYPES =
DatabaseTypeRegistry.getDatabaseTypeNames();
@Override
- public String getAlgorithmType() {
- return FixtureDataConsistencyCheckAlgorithm.TYPE;
+ public String getDescription() {
+ return "Match raw data of records.";
}
@Override
- public String getDatabaseType() {
- return "H2";
+ public Collection<String> getSupportedDatabaseTypes() {
+ return SUPPORTED_DATABASE_TYPES;
}
@Override
- public Object dataCalculate(final DataCalculateParameter
dataCalculateParameter) {
- return true;
+ public String getType() {
+ return TYPE;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
new file mode 100644
index 0000000..3dd8b2b
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+
+import com.google.common.base.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.exception.DataCheckFailException;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Data match implementation of single table data calculator.
+ */
+@Slf4j
+public final class DataMatchSingleTableDataCalculator extends
AbstractStreamingSingleTableDataCalculator {
+
+ private static final Collection<String> DATABASE_TYPES =
DatabaseTypeRegistry.getDatabaseTypeNames();
+
+ private static final String CHUNK_SIZE_KEY = "chunk-size";
+
+ private volatile int chunkSize = 1000;
+
+ @Override
+ public String getAlgorithmType() {
+ return DataMatchDataConsistencyCheckAlgorithm.TYPE;
+ }
+
+ @Override
+ public Collection<String> getDatabaseTypes() {
+ return DATABASE_TYPES;
+ }
+
+ @Override
+ public void init() {
+ Properties algorithmProps = getAlgorithmProps();
+ String chunkSizeValue = algorithmProps.getProperty(CHUNK_SIZE_KEY);
+ if (!Strings.isNullOrEmpty(chunkSizeValue)) {
+ int chunkSize = Integer.parseInt(chunkSizeValue);
+ if (chunkSize <= 0) {
+ log.warn("invalid chunkSize={}, use default value", chunkSize);
+ }
+ this.chunkSize = chunkSize;
+ }
+ }
+
+ @Override
+ protected Optional<Object> calculateChunk(final DataCalculateParameter
dataCalculateParameter) {
+ String logicTableName = dataCalculateParameter.getLogicTableName();
+ PipelineSQLBuilder sqlBuilder =
ScalingSQLBuilderFactory.newInstance(dataCalculateParameter.getDatabaseType());
+ String uniqueKey = dataCalculateParameter.getUniqueKey();
+ CalculatedResult previousCalculatedResult = (CalculatedResult)
dataCalculateParameter.getPreviousCalculatedResult();
+ Number startUniqueValue = null != previousCalculatedResult ?
previousCalculatedResult.getMaxUniqueValue() : -1;
+ String sql = sqlBuilder.buildChunkedQuerySQL(logicTableName,
uniqueKey, startUniqueValue);
+ try {
+ return query(dataCalculateParameter.getDataSource(), sql,
uniqueKey, startUniqueValue, chunkSize);
+ } catch (final SQLException ex) {
+ throw new DataCheckFailException(String.format("table %s data
check failed.", logicTableName), ex);
+ }
+ }
+
+ private Optional<Object> query(final DataSource dataSource, final String
sql, final String uniqueKey, final Number startUniqueValue, final int
chunkSize) throws SQLException {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setObject(1, startUniqueValue);
+ preparedStatement.setInt(2, chunkSize);
+ Collection<Collection<Object>> records = new
ArrayList<>(chunkSize);
+ Number maxUniqueValue = null;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ ResultSetMetaData resultSetMetaData =
resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ Collection<Object> record = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount;
columnIndex++) {
+ record.add(resultSet.getObject(columnIndex));
+ }
+ records.add(record);
+ maxUniqueValue = (Number) resultSet.getObject(uniqueKey);
+ }
+ }
+ return records.isEmpty() ? Optional.empty() : Optional.of(new
CalculatedResult(maxUniqueValue, records.size(), records));
+ }
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ @EqualsAndHashCode
+ private static final class CalculatedResult {
+
+ @NonNull
+ private final Number maxUniqueValue;
+
+ private final int recordCount;
+
+ private final Collection<Collection<Object>> records;
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 8bb4ccb..2a81ac5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -144,6 +145,13 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
}
@Override
+ public String buildChunkedQuerySQL(final String tableName, final String
uniqueKey, final Number startUniqueValue) {
+ Preconditions.checkNotNull(uniqueKey, "uniqueKey is null");
+ Preconditions.checkNotNull(startUniqueValue, "startUniqueValue is
null");
+ return "SELECT * FROM " + quote(tableName) + " WHERE " +
quote(uniqueKey) + " > ? ORDER BY " + quote(uniqueKey) + " ASC LIMIT ?";
+ }
+
+ @Override
public String buildCheckEmptySQL(final String tableName) {
return String.format("SELECT * FROM %s LIMIT 1", quote(tableName));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index 5676b27..42d338e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import com.google.common.base.Preconditions;
-import lombok.Getter;
-import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
@@ -29,8 +27,6 @@ import java.util.Properties;
/**
* Idle rule altered job completion detect algorithm.
*/
-@Getter
-@Setter
public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements
RuleAlteredJobCompletionDetectAlgorithm {
public static final String IDLE_THRESHOLD_KEY =
"incremental-task-idle-minute-threshold";
@@ -40,6 +36,16 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithm implements RuleAl
private long incrementalTaskIdleMinuteThreshold = 30;
@Override
+ public Properties getProps() {
+ return props;
+ }
+
+ @Override
+ public void setProps(final Properties props) {
+ this.props = props;
+ }
+
+ @Override
public void init() {
Preconditions.checkArgument(props.containsKey(IDLE_THRESHOLD_KEY), "%s
can not be null.", IDLE_THRESHOLD_KEY);
incrementalTaskIdleMinuteThreshold =
Long.parseLong(props.getProperty(IDLE_THRESHOLD_KEY));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
index 844233e..19a6d9d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmP
import java.util.Collection;
/**
- * Data consistency check algorithm for SPI.
+ * Data consistency check algorithm, SPI.
*/
public interface DataConsistencyCheckAlgorithm extends
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
index 067b7e5..06223b5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
@@ -19,8 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.spi.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
+import java.util.Collection;
+import java.util.Properties;
+
/**
- * Single table data calculator interface for SPI.
+ * Single table data calculator interface, SPI.
* <p>
* SPI implementation will be initialized as new instance every time.
* </p>
@@ -35,18 +38,37 @@ public interface SingleTableDataCalculator {
String getAlgorithmType();
/**
- * Get database type.
+ * Get database types.
+ *
+ * @return database types
+ */
+ Collection<String> getDatabaseTypes();
+
+ /**
+ * Get algorithm properties.
+ *
+ * @return properties
+ */
+ Properties getAlgorithmProps();
+
+ /**
+ * Set algorithm properties.
+ * Used by data consistency check algorithm.
*
- * @return database type
+ * @param algorithmProps algorithm properties
+ */
+ void setAlgorithmProps(Properties algorithmProps);
+
+ /**
+ * Initialize create data calculator.
*/
- // TODO support database types?
- String getDatabaseType();
+ void init();
/**
- * Calculate table data, usually checksum.
+ * Calculate table data content, return checksum typically.
*
* @param dataCalculateParameter data calculate parameter
* @return calculated result, it will be used to check equality.
*/
- Object dataCalculate(DataCalculateParameter dataCalculateParameter);
+ Iterable<Object> calculate(DataCalculateParameter dataCalculateParameter);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 8f14744..6232940 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -84,6 +84,16 @@ public interface PipelineSQLBuilder {
String buildCountSQL(String tableName);
/**
+ * Build query SQL.
+ *
+ * @param tableName table name
+ * @param uniqueKey unique key, it may be primary key, not null
+ * @param startUniqueValue start unique value, not null
+ * @return query SQL
+ */
+ String buildChunkedQuerySQL(String tableName, String uniqueKey, Number
startUniqueValue);
+
+ /**
* Build check empty SQL.
*
* @param tableName table name
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
index 57ab8e3..078c666 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
@@ -15,4 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.check.consistency.DefaultDataConsistencyCheckAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchDataConsistencyCheckAlgorithm
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
similarity index 88%
copy from
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index 89cd902..5749056 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.scaling.mysql.component.checker.DefaultMySQLSingleTableDataCalculator
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchSingleTableDataCalculator
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index 08fb878..4ef066f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -111,7 +111,9 @@
# sourceWritingStopper:
# type: DEFAULT
# dataConsistencyChecker:
-# type: DEFAULT
+# type: DATA_MATCH
+# props:
+# chunk-size: 1000
# checkoutLocker:
# type: DEFAULT
@@ -198,9 +200,20 @@
# default_scaling:
# blockQueueSize: 10000
# workerThread: 40
+# readBatchSize: 1000
+# rateLimiter:
+# type: SOURCE
+# props:
+# qps: 50
# completionDetector:
# type: IDLE
# props:
# incremental-task-idle-minute-threshold: 30
+# sourceWritingStopper:
+# type: DEFAULT
# dataConsistencyChecker:
+# type: DATA_MATCH
+# props:
+# chunk-size: 1000
+# checkoutLocker:
# type: DEFAULT
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/DefaultMySQLSingleTableDataCalculator.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/CRC32MatchMySQLSingleTableDataCalculator.java
similarity index 59%
rename from
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/DefaultMySQLSingleTableDataCalculator.java
rename to
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/CRC32MatchMySQLSingleTableDataCalculator.java
index ce7aa65..3b2219eb 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/DefaultMySQLSingleTableDataCalculator.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/CRC32MatchMySQLSingleTableDataCalculator.java
@@ -18,10 +18,9 @@
package org.apache.shardingsphere.scaling.mysql.component.checker;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.AbstractSingleTableDataCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DefaultDataConsistencyCheckAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.DataCheckFailException;
+import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.scaling.mysql.component.MySQLPipelineSQLBuilder;
@@ -30,48 +29,50 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.stream.Collectors;
/**
- * Default MySQL single table data calculator.
+ * CRC32 match MySQL implementation of single table data calculator.
*/
-public final class DefaultMySQLSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
+public final class CRC32MatchMySQLSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
- private static final String DATABASE_TYPE = new
MySQLDatabaseType().getName();
+ private static final Collection<String> DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getName());
@Override
public String getAlgorithmType() {
- return DefaultDataConsistencyCheckAlgorithm.TYPE;
+ return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
}
@Override
- public String getDatabaseType() {
- return DATABASE_TYPE;
+ public Collection<String> getDatabaseTypes() {
+ return DATABASE_TYPES;
}
@Override
- public Object dataCalculate(final DataCalculateParameter
dataCalculateParameter) {
+ public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
String logicTableName = dataCalculateParameter.getLogicTableName();
MySQLPipelineSQLBuilder scalingSQLBuilder = new
MySQLPipelineSQLBuilder(new HashMap<>());
- try (DataSourceWrapper dataSource =
getDataSource(dataCalculateParameter.getDataSourceConfig())) {
- return dataCalculateParameter.getColumnNames().stream().map(each
-> {
- String sql =
scalingSQLBuilder.buildSumCrc32SQL(logicTableName, each);
- return sumCrc32(dataSource, sql);
- }).collect(Collectors.toList());
- } catch (final SQLException ex) {
- throw new DataCheckFailException(String.format("table %s data
check failed.", logicTableName), ex);
- }
+ List<Long> result =
dataCalculateParameter.getColumnNames().stream().map(each -> {
+ String sql = scalingSQLBuilder.buildSumCrc32SQL(logicTableName,
each);
+ try {
+ return sumCrc32(dataCalculateParameter.getDataSource(), sql);
+ } catch (final SQLException ex) {
+ throw new DataCheckFailException(String.format("table %s data
check failed.", logicTableName), ex);
+ }
+ }).collect(Collectors.toList());
+ return Collections.unmodifiableList(result);
}
- private long sumCrc32(final DataSource dataSource, final String sql) {
+ private long sumCrc32(final DataSource dataSource, final String sql)
throws SQLException {
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
- } catch (final SQLException ex) {
- throw new DataCheckFailException(String.format("execute %s
failed.", sql), ex);
}
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index 89cd902..f05ee7e 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.scaling.mysql.component.checker.DefaultMySQLSingleTableDataCalculator
+org.apache.shardingsphere.scaling.mysql.component.checker.CRC32MatchMySQLSingleTableDataCalculator
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/DefaultPostgreSQLSingleTableDataCalculator.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/CRC32MatchPostgreSQLSingleTableDataCalculator.java
similarity index 56%
rename from
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/DefaultPostgreSQLSingleTableDataCalculator.java
rename to
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/CRC32MatchPostgreSQLSingleTableDataCalculator.java
index 791dddd..30ee199 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/DefaultPostgreSQLSingleTableDataCalculator.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/CRC32MatchPostgreSQLSingleTableDataCalculator.java
@@ -18,30 +18,33 @@
package org.apache.shardingsphere.scaling.postgresql.component.checker;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.AbstractSingleTableDataCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DefaultDataConsistencyCheckAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import java.util.Collection;
+import java.util.Collections;
+
/**
- * Default PostgreSQL single table data calculator.
+ * CRC32 match PostgreSQL implementation of single table data calculator.
*/
-public final class DefaultPostgreSQLSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
+public final class CRC32MatchPostgreSQLSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
- private static final String DATABASE_TYPE = new
PostgreSQLDatabaseType().getName();
+ private static final Collection<String> DATABASE_TYPES =
Collections.singletonList(new PostgreSQLDatabaseType().getName());
@Override
public String getAlgorithmType() {
- return DefaultDataConsistencyCheckAlgorithm.TYPE;
+ return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
}
@Override
- public String getDatabaseType() {
- return DATABASE_TYPE;
+ public Collection<String> getDatabaseTypes() {
+ return DATABASE_TYPES;
}
@Override
- public Object dataCalculate(final DataCalculateParameter
dataCalculateParameter) {
- //TODO PostgreSQL dataCalculate
- return true;
+ public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
+ //TODO PostgreSQL calculate
+ return Collections.emptyList();
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index ad3cf6b..710380b 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.scaling.postgresql.component.checker.DefaultPostgreSQLSingleTableDataCalculator
+org.apache.shardingsphere.scaling.postgresql.component.checker.CRC32MatchPostgreSQLSingleTableDataCalculator
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
index afa89a1..44830e0 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
@@ -51,17 +51,17 @@ public final class CheckScalingQueryResultSet implements
DistSQLResultSet {
.map(each -> {
Collection<Object> list = new LinkedList<>();
list.add(each.getKey());
- list.add(each.getValue().getSourceCount());
- list.add(each.getValue().getTargetCount());
- list.add(each.getValue().isCountValid() + "");
- list.add(each.getValue().isDataValid() + "");
+ list.add(each.getValue().getSourceRecordsCount());
+ list.add(each.getValue().getTargetRecordsCount());
+ list.add(each.getValue().isRecordsCountMatched() + "");
+ list.add(each.getValue().isRecordsContentMatched() + "");
return list;
}).collect(Collectors.toList()).iterator();
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("table_name", "source_count", "target_count",
"count_valid", "data_valid");
+ return Arrays.asList("table_name", "source_records_count",
"target_records_count", "records_count_matched", "records_content_matched");
}
@Override
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
index a89152d..0061785 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.distsql.handler;
-import org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.scaling.distsql.statement.ShowScalingCheckAlgorithmsStatement;
import org.junit.Test;
@@ -27,9 +26,12 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
public final class ShowScalingCheckAlgorithmsQueryResultSetTest {
@@ -42,14 +44,16 @@ public final class
ShowScalingCheckAlgorithmsQueryResultSetTest {
@Test
public void assertGetRowData() {
- DistSQLResultSet resultSet = new
ShowScalingCheckAlgorithmsQueryResultSet();
+ ShowScalingCheckAlgorithmsQueryResultSet resultSet = new
ShowScalingCheckAlgorithmsQueryResultSet();
resultSet.init(shardingSphereMetaData,
showScalingCheckAlgorithmsStatement);
- Collection<Object> actual = resultSet.getRowData();
- assertThat(actual.size(), is(4));
- Iterator<Object> rowData = actual.iterator();
- assertThat(rowData.next(), is("DEFAULT"));
- assertThat(rowData.next(), is("Default implementation with CRC32 of
all records."));
- assertThat(rowData.next(), is("MySQL"));
- assertThat(rowData.next(), is("ShardingSphere"));
+ Set<Object> algorithmTypes = new LinkedHashSet<>();
+ while (resultSet.next()) {
+ Collection<Object> actual = resultSet.getRowData();
+ assertThat(actual.size(), is(4));
+ Iterator<Object> rowData = actual.iterator();
+ algorithmTypes.add(rowData.next());
+ }
+ assertTrue(algorithmTypes.contains("DATA_MATCH"));
+ assertTrue(algorithmTypes.contains("CRC32_MATCH"));
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index 5ac6109..59cb3e9 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -152,9 +152,9 @@ public final class PipelineJobAPIImplTest {
initTableData(jobConfig.getRuleConfig());
Map<String, DataConsistencyCheckResult> checkResultMap =
pipelineJobAPI.dataConsistencyCheck(jobId.get(),
FixtureDataConsistencyCheckAlgorithm.TYPE);
assertThat(checkResultMap.size(), is(1));
- assertTrue(checkResultMap.get("t_order").isCountValid());
- assertTrue(checkResultMap.get("t_order").isDataValid());
- assertThat(checkResultMap.get("t_order").getTargetCount(), is(2L));
+ assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
+ assertTrue(checkResultMap.get("t_order").isRecordsContentMatched());
+ assertThat(checkResultMap.get("t_order").getTargetRecordsCount(),
is(2L));
}
@Test
@@ -164,17 +164,17 @@ public final class PipelineJobAPIImplTest {
checkResultMap = Collections.emptyMap();
assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId,
checkResultMap), is(false));
DataConsistencyCheckResult trueResult = new
DataConsistencyCheckResult(1, 1);
- trueResult.setDataValid(true);
+ trueResult.setRecordsContentMatched(true);
DataConsistencyCheckResult checkResult;
checkResult = new DataConsistencyCheckResult(100, 95);
checkResultMap = ImmutableMap.<String,
DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order",
checkResult).build();
assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId,
checkResultMap), is(false));
checkResult = new DataConsistencyCheckResult(100, 100);
- checkResult.setDataValid(false);
+ checkResult.setRecordsContentMatched(false);
checkResultMap = ImmutableMap.<String,
DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order",
checkResult).build();
assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId,
checkResultMap), is(false));
checkResult = new DataConsistencyCheckResult(100, 100);
- checkResult.setDataValid(true);
+ checkResult.setRecordsContentMatched(true);
checkResultMap = ImmutableMap.<String,
DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order",
checkResult).build();
assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId,
checkResultMap), is(true));
}
@@ -188,7 +188,7 @@ public final class PipelineJobAPIImplTest {
initTableData(jobConfig.getRuleConfig());
pipelineJobAPI.reset(jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap =
pipelineJobAPI.dataConsistencyCheck(jobId.get(),
FixtureDataConsistencyCheckAlgorithm.TYPE);
- assertThat(checkResultMap.get("t_order").getTargetCount(), is(0L));
+ assertThat(checkResultMap.get("t_order").getTargetRecordsCount(),
is(0L));
}
@SneakyThrows(SQLException.class)
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index b0b41d5..39463ce 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -49,10 +49,10 @@ public final class DataConsistencyCheckerImplTest {
DataConsistencyChecker dataConsistencyChecker =
EnvironmentCheckerFactory.newInstance(jobContext);
initTableData(jobContext.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig());
initTableData(jobContext.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig());
- Map<String, DataConsistencyCheckResult> resultMap =
dataConsistencyChecker.countCheck();
- assertTrue(resultMap.get("t_order").isCountValid());
- assertThat(resultMap.get("t_order").getSourceCount(),
is(resultMap.get("t_order").getTargetCount()));
- Map<String, Boolean> dataCheckResultMap =
dataConsistencyChecker.dataCheck(new FixtureDataConsistencyCheckAlgorithm());
+ Map<String, DataConsistencyCheckResult> resultMap =
dataConsistencyChecker.checkRecordsCount();
+ assertTrue(resultMap.get("t_order").isRecordsCountMatched());
+ assertThat(resultMap.get("t_order").getSourceRecordsCount(),
is(resultMap.get("t_order").getTargetRecordsCount()));
+ Map<String, Boolean> dataCheckResultMap =
dataConsistencyChecker.checkRecordsContent(new
FixtureDataConsistencyCheckAlgorithm());
assertTrue(dataCheckResultMap.get("t_order"));
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
index 0c6ff2a..8997096 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -68,7 +69,7 @@ public final class DataSourceWrapperTest {
@Test
public void assertGetConnection() throws SQLException {
- DataSourceWrapper dataSourceWrapper = new
DataSourceWrapper(dataSource);
+ DataSourceWrapper dataSourceWrapper = new
DataSourceWrapper(dataSource, new H2DatabaseType());
assertThat(dataSourceWrapper.getConnection(), is(connection));
assertThat(dataSourceWrapper.getConnection(CLIENT_USERNAME,
CLIENT_PASSWORD), is(connection));
assertGetLogWriter(dataSourceWrapper.getLogWriter());
@@ -96,24 +97,24 @@ public final class DataSourceWrapperTest {
@Test(expected = SQLException.class)
public void assertSetLoginTimeoutFailure() throws SQLException {
doThrow(new
SQLException("")).when(dataSource).setLoginTimeout(LOGIN_TIMEOUT);
- new DataSourceWrapper(dataSource).setLoginTimeout(LOGIN_TIMEOUT);
+ new DataSourceWrapper(dataSource, new
H2DatabaseType()).setLoginTimeout(LOGIN_TIMEOUT);
}
@Test(expected = SQLException.class)
public void assertSetLogWriterFailure() throws SQLException {
doThrow(new
SQLException("")).when(dataSource).setLogWriter(printWriter);
- new DataSourceWrapper(dataSource).setLogWriter(printWriter);
+ new DataSourceWrapper(dataSource, new
H2DatabaseType()).setLogWriter(printWriter);
}
@Test(expected = SQLException.class)
public void assertCloseExceptionFailure() throws Exception {
doThrow(new Exception("")).when((AutoCloseable) dataSource).close();
- new DataSourceWrapper(dataSource).close();
+ new DataSourceWrapper(dataSource, new H2DatabaseType()).close();
}
@Test(expected = SQLException.class)
public void assertCloseSQLExceptionFailure() throws Exception {
doThrow(new SQLException("")).when((AutoCloseable) dataSource).close();
- new DataSourceWrapper(dataSource).close();
+ new DataSourceWrapper(dataSource, new H2DatabaseType()).close();
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
index 0c4792d..61a7a14 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
@@ -18,9 +18,12 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
-public final class FixtureH2SingleTableDataCalculator implements
SingleTableDataCalculator {
+import java.util.Collection;
+import java.util.Collections;
+
+public final class FixtureH2SingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
@Override
public String getAlgorithmType() {
@@ -28,12 +31,12 @@ public final class FixtureH2SingleTableDataCalculator
implements SingleTableData
}
@Override
- public String getDatabaseType() {
- return "H2";
+ public Collection<String> getDatabaseTypes() {
+ return Collections.singletonList("H2");
}
@Override
- public Object dataCalculate(final DataCalculateParameter
dataCalculateParameter) {
- return true;
+ public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
+ return Collections.singletonList(true);
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ScalingDefaultDataConsistencyCheckAlgorithmTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
similarity index 76%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ScalingDefaultDataConsistencyCheckAlgorithmTest.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
index eee406c..68b6129 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ScalingDefaultDataConsistencyCheckAlgorithmTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
import org.junit.Test;
@@ -25,13 +25,13 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
-public final class ScalingDefaultDataConsistencyCheckAlgorithmTest {
+public final class CRC32MatchDataConsistencyCheckAlgorithmTest {
@Test
public void assertNewInstance() {
- DefaultDataConsistencyCheckAlgorithm checkAlgorithm = new
DefaultDataConsistencyCheckAlgorithm();
+ CRC32MatchDataConsistencyCheckAlgorithm checkAlgorithm = new
CRC32MatchDataConsistencyCheckAlgorithm();
checkAlgorithm.init();
- assertThat(checkAlgorithm.getType(),
is(DefaultDataConsistencyCheckAlgorithm.TYPE));
+ assertThat(checkAlgorithm.getType(),
is(CRC32MatchDataConsistencyCheckAlgorithm.TYPE));
assertNotNull(checkAlgorithm.getDescription());
assertThat(checkAlgorithm.getProvider(), is("ShardingSphere"));
assertThat(checkAlgorithm.getSupportedDatabaseTypes(),
is(Collections.singletonList("MySQL")));
@@ -39,7 +39,7 @@ public final class
ScalingDefaultDataConsistencyCheckAlgorithmTest {
@Test(expected = NullPointerException.class)
public void assertGetSingleTableDataCalculator() {
- DefaultDataConsistencyCheckAlgorithm checkAlgorithm = new
DefaultDataConsistencyCheckAlgorithm();
+ CRC32MatchDataConsistencyCheckAlgorithm checkAlgorithm = new
CRC32MatchDataConsistencyCheckAlgorithm();
checkAlgorithm.getSupportedDatabaseTypes().forEach(checkAlgorithm::getSingleTableDataCalculator);
}
}