This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f46b90789 Enable concurrent CRC32 match on source and target (#24708)
f3f46b90789 is described below

commit f3f46b90789fba665e7abfcecb23d08c64104fe2
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Mar 21 12:10:28 2023 +0800

    Enable concurrent CRC32 match on source and target (#24708)
---
 .../SingleTableInventoryDataConsistencyChecker.java            | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index b278267f059..3539f7c1b31 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -101,8 +101,8 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
         String targetTableName = targetTable.getTableName().getOriginal();
         DataConsistencyCalculateParameter targetParam = 
buildParameter(targetDataSource, targetTable.getSchemaName().getOriginal(), 
targetTableName,
                 columnNames, targetDatabaseType, sourceDatabaseType, 
uniqueKey, tableCheckPositions.get(targetTableName));
-        Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = 
calculateAlgorithm.calculate(sourceParam).iterator();
-        Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = 
calculateAlgorithm.calculate(targetParam).iterator();
+        Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = 
waitFuture(executor.submit(() -> 
calculateAlgorithm.calculate(sourceParam))).iterator();
+        Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = 
waitFuture(executor.submit(() -> 
calculateAlgorithm.calculate(targetParam))).iterator();
         try {
             return check0(sourceCalculatedResults, targetCalculatedResults, 
executor);
             // CHECKSTYLE:OFF
@@ -127,10 +127,8 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
             if (null != readRateLimitAlgorithm) {
                 readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
             }
-            Future<DataConsistencyCalculatedResult> sourceFuture = 
executor.submit(sourceCalculatedResults::next);
-            Future<DataConsistencyCalculatedResult> targetFuture = 
executor.submit(targetCalculatedResults::next);
-            DataConsistencyCalculatedResult sourceCalculatedResult = 
waitFuture(sourceFuture);
-            DataConsistencyCalculatedResult targetCalculatedResult = 
waitFuture(targetFuture);
+            DataConsistencyCalculatedResult sourceCalculatedResult = 
waitFuture(executor.submit(sourceCalculatedResults::next));
+            DataConsistencyCalculatedResult targetCalculatedResult = 
waitFuture(executor.submit(targetCalculatedResults::next));
             sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
             targetRecordsCount += targetCalculatedResult.getRecordsCount();
             contentMatched = Objects.equals(sourceCalculatedResult, 
targetCalculatedResult);

Reply via email to