This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f3f46b90789 Enable concurrent CRC32 match on source and target (#24708)
f3f46b90789 is described below
commit f3f46b90789fba665e7abfcecb23d08c64104fe2
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Mar 21 12:10:28 2023 +0800
Enable concurrent CRC32 match on source and target (#24708)
---
.../SingleTableInventoryDataConsistencyChecker.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index b278267f059..3539f7c1b31 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -101,8 +101,8 @@ public final class
SingleTableInventoryDataConsistencyChecker {
String targetTableName = targetTable.getTableName().getOriginal();
DataConsistencyCalculateParameter targetParam =
buildParameter(targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTableName,
columnNames, targetDatabaseType, sourceDatabaseType,
uniqueKey, tableCheckPositions.get(targetTableName));
- Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParam).iterator();
- Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParam).iterator();
+ Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
waitFuture(executor.submit(() ->
calculateAlgorithm.calculate(sourceParam))).iterator();
+ Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
waitFuture(executor.submit(() ->
calculateAlgorithm.calculate(targetParam))).iterator();
try {
return check0(sourceCalculatedResults, targetCalculatedResults,
executor);
// CHECKSTYLE:OFF
@@ -127,10 +127,8 @@ public final class
SingleTableInventoryDataConsistencyChecker {
if (null != readRateLimitAlgorithm) {
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
- Future<DataConsistencyCalculatedResult> sourceFuture =
executor.submit(sourceCalculatedResults::next);
- Future<DataConsistencyCalculatedResult> targetFuture =
executor.submit(targetCalculatedResults::next);
- DataConsistencyCalculatedResult sourceCalculatedResult =
waitFuture(sourceFuture);
- DataConsistencyCalculatedResult targetCalculatedResult =
waitFuture(targetFuture);
+ DataConsistencyCalculatedResult sourceCalculatedResult =
waitFuture(executor.submit(sourceCalculatedResults::next));
+ DataConsistencyCalculatedResult targetCalculatedResult =
waitFuture(executor.submit(targetCalculatedResults::next));
sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
targetRecordsCount += targetCalculatedResult.getRecordsCount();
contentMatched = Objects.equals(sourceCalculatedResult,
targetCalculatedResult);