This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba99f7  Make scaling source and target data consistency calculation 
concurrent (#13076)
7ba99f7 is described below

commit 7ba99f7c4dd8fbfcbd394d92f4fd94f0a22f80f4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Oct 18 19:26:43 2021 +0800

    Make scaling source and target data consistency calculation concurrent 
(#13076)
    
    * Concurrent countCheck
    
    * Concurrent dataCheck
---
 .../consistency/DataConsistencyCheckerImpl.java    | 53 ++++++++++++++++------
 1 file changed, 40 insertions(+), 13 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
index eb6773a..a77d993 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import 
org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.scaling.core.api.SingleTableDataCalculator;
 import 
org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
@@ -41,6 +42,12 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -58,20 +65,29 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
     
     @Override
     public Map<String, DataConsistencyCheckResult> countCheck() {
-        return jobContext.getTaskConfigs()
-                .stream().flatMap(each -> 
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
-                .stream().collect(Collectors.toMap(Function.identity(), 
this::countCheck, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" 
+ jobContext.getJobId() % 10_000 + "-countCheck-%d");
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+        try {
+            return jobContext.getTaskConfigs()
+                    .stream().flatMap(each -> 
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
+                    .stream().collect(Collectors.toMap(Function.identity(), 
table -> countCheck(table, executor), (oldValue, currentValue) -> oldValue, 
LinkedHashMap::new));
+        } finally {
+            executor.shutdown();
+            executor.shutdownNow();
+        }
     }
     
-    private DataConsistencyCheckResult countCheck(final String table) {
+    private DataConsistencyCheckResult countCheck(final String table, final 
ThreadPoolExecutor executor) {
         ScalingDataSourceConfiguration sourceConfig = 
jobContext.getJobConfig().getRuleConfig().getSource().unwrap();
         ScalingDataSourceConfiguration targetConfig = 
jobContext.getJobConfig().getRuleConfig().getTarget().unwrap();
         try (DataSourceWrapper sourceDataSource = 
dataSourceFactory.newInstance(sourceConfig);
              DataSourceWrapper targetDataSource = 
dataSourceFactory.newInstance(targetConfig)) {
-            long sourceCount = count(sourceDataSource, table, 
sourceConfig.getDatabaseType());
-            long targetCount = count(targetDataSource, table, 
targetConfig.getDatabaseType());
+            Future<Long> sourceFuture = executor.submit(() -> 
count(sourceDataSource, table, sourceConfig.getDatabaseType()));
+            Future<Long> targetFuture = executor.submit(() -> 
count(targetDataSource, table, targetConfig.getDatabaseType()));
+            long sourceCount = sourceFuture.get();
+            long targetCount = targetFuture.get();
             return new DataConsistencyCheckResult(sourceCount, targetCount);
-        } catch (final SQLException ex) {
+        } catch (final SQLException | InterruptedException | 
ExecutionException ex) {
             throw new DataCheckFailException(String.format("table %s count 
check failed.", table), ex);
         }
     }
@@ -106,12 +122,23 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
         SingleTableDataCalculator sourceCalculator = 
checkAlgorithm.getSingleTableDataCalculator(sourceConfig.getDatabaseType().getName());
         SingleTableDataCalculator targetCalculator = 
checkAlgorithm.getSingleTableDataCalculator(targetConfig.getDatabaseType().getName());
         Map<String, Boolean> result = new HashMap<>();
-        for (String each : logicTableNames) {
-            Collection<String> columnNames = tablesColumnNamesMap.get(each);
-            Object sourceCalculateResult = 
sourceCalculator.dataCalculate(sourceConfig, each, columnNames);
-            Object targetCalculateResult = 
targetCalculator.dataCalculate(targetConfig, each, columnNames);
-            boolean calculateResultsEquals = 
Objects.equals(sourceCalculateResult, targetCalculateResult);
-            result.put(each, calculateResultsEquals);
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" 
+ jobContext.getJobId() % 10_000 + "-dataCheck-%d");
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+        try {
+            for (String each : logicTableNames) {
+                Collection<String> columnNames = 
tablesColumnNamesMap.get(each);
+                Future<Object> sourceFuture = executor.submit(() -> 
sourceCalculator.dataCalculate(sourceConfig, each, columnNames));
+                Future<Object> targetFuture = executor.submit(() -> 
targetCalculator.dataCalculate(targetConfig, each, columnNames));
+                Object sourceCalculateResult = sourceFuture.get();
+                Object targetCalculateResult = targetFuture.get();
+                boolean calculateResultsEquals = 
Objects.equals(sourceCalculateResult, targetCalculateResult);
+                result.put(each, calculateResultsEquals);
+            }
+        } catch (final ExecutionException | InterruptedException ex) {
+            throw new DataCheckFailException("data check failed");
+        } finally {
+            executor.shutdown();
+            executor.shutdownNow();
         }
         return result;
     }

Reply via email to