This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ab5d911478a Isolate consistency check tableCheckPositions for source 
and target (#28143)
ab5d911478a is described below

commit ab5d911478a3d6f91e94f237efbab5fb0b9bdac0
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 17 16:27:05 2023 +0800

    Isolate consistency check tableCheckPositions for source and target (#28143)
    
    * Refactor tableCheckPositions to sourceTableCheckPositions and 
targetTableCheckPositions
    
    * Add ConsistencyCheckJobItemContextTest
---
 .../progress/ConsistencyCheckJobItemProgress.java  |  6 +-
 .../yaml/YamlConsistencyCheckJobItemProgress.java  |  5 +-
 ...YamlConsistencyCheckJobItemProgressSwapper.java | 13 ++---
 .../ConsistencyCheckJobItemProgressContext.java    |  4 +-
 .../table/MatchingTableDataConsistencyChecker.java | 10 ++--
 .../api/impl/ConsistencyCheckJobAPI.java           |  3 +-
 .../context/ConsistencyCheckJobItemContext.java    |  3 +-
 .../ConsistencyCheckJobItemContextTest.java        | 65 ++++++++++++++++++++++
 .../consistencycheck/ConsistencyCheckJobTest.java  |  6 +-
 9 files changed, 92 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
index 04950d53919..3b36f63e16c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Data consistency check job item progress.
  */
-// TODO move package
 @RequiredArgsConstructor
 @Getter
 @ToString
+// TODO Refactor structure, List<TableProgress>
 public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemProgress {
     
     @Setter
@@ -49,5 +49,7 @@ public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemPro
     
     private final Long checkEndTimeMillis;
     
-    private final Map<String, Object> tableCheckPositions;
+    private final Map<String, Object> sourceTableCheckPositions;
+    
+    private final Map<String, Object> targetTableCheckPositions;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index 3e5b9086484..c7984ca0c4e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -44,5 +45,7 @@ public final class YamlConsistencyCheckJobItemProgress 
implements YamlConfigurat
     
     private Long checkEndTimeMillis;
     
-    private Map<String, Object> tableCheckPositions;
+    private Map<String, Object> sourceTableCheckPositions = new 
LinkedHashMap<>();
+    
+    private Map<String, Object> targetTableCheckPositions = new 
LinkedHashMap<>();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index 96a28986c9d..bece1b9b17c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -21,9 +21,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 /**
  * YAML data check job item progress swapper.
  */
@@ -39,18 +36,16 @@ public final class 
YamlConsistencyCheckJobItemProgressSwapper implements YamlCon
         result.setRecordsCount(data.getRecordsCount());
         result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
         result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
-        result.setTableCheckPositions(data.getTableCheckPositions());
+        
result.setSourceTableCheckPositions(data.getSourceTableCheckPositions());
+        
result.setTargetTableCheckPositions(data.getTargetTableCheckPositions());
         return result;
     }
     
     @Override
     public ConsistencyCheckJobItemProgress swapToObject(final 
YamlConsistencyCheckJobItemProgress yamlConfig) {
-        Map<String, Object> tableCheckPositions = new LinkedHashMap<>();
-        if (null != yamlConfig.getTableCheckPositions()) {
-            tableCheckPositions.putAll(yamlConfig.getTableCheckPositions());
-        }
         ConsistencyCheckJobItemProgress result = new 
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(), 
yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(),
-                yamlConfig.getRecordsCount(), 
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(), 
tableCheckPositions);
+                yamlConfig.getRecordsCount(), 
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
+                yamlConfig.getSourceTableCheckPositions(), 
yamlConfig.getTargetTableCheckPositions());
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index f11b5d9c8d4..7822408a81c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -54,7 +54,9 @@ public final class ConsistencyCheckJobItemProgressContext 
implements PipelineJob
     
     private volatile Long checkEndTimeMillis;
     
-    private final Map<String, Object> tableCheckPositions = new 
ConcurrentHashMap<>();
+    private final Map<String, Object> sourceTableCheckPositions = new 
ConcurrentHashMap<>();
+    
+    private final Map<String, Object> targetTableCheckPositions = new 
ConcurrentHashMap<>();
     
     @Override
     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
index b88f7bf2d17..79518e3edb4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.sql.SQLException;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -65,11 +64,10 @@ public abstract class MatchingTableDataConsistencyChecker 
implements TableDataCo
     }
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableDataConsistencyCheckParameter param, 
final ThreadPoolExecutor executor) {
-        Map<String, Object> tableCheckPositions = 
param.getProgressContext().getTableCheckPositions();
         SingleTableInventoryCalculateParameter sourceParam = new 
SingleTableInventoryCalculateParameter(param.getSourceDataSource(), 
param.getSourceTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
tableCheckPositions.get(param.getSourceTable().getTableName().getOriginal()));
+                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getOriginal()));
         SingleTableInventoryCalculateParameter targetParam = new 
SingleTableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
tableCheckPositions.get(param.getTargetTable().getTableName().getOriginal()));
+                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getOriginal()));
         SingleTableInventoryCalculator calculator = 
getSingleTableInventoryCalculator();
         Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults 
= waitFuture(executor.submit(() -> 
calculator.calculate(sourceParam))).iterator();
         Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults 
= waitFuture(executor.submit(() -> 
calculator.calculate(targetParam))).iterator();
@@ -99,10 +97,10 @@ public abstract class MatchingTableDataConsistencyChecker 
implements TableDataCo
                 break;
             }
             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getTableCheckPositions().put(param.getSourceTable().getTableName().getOriginal(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
+                
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getOriginal(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getTableCheckPositions().put(param.getTargetTable().getTableName().getOriginal(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
+                
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getOriginal(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
             param.getProgressContext().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
         }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 361d045d6df..6473566fe4f 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -149,7 +149,8 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         String tableNames = String.join(",", progressContext.getTableNames());
         String ignoredTableNames = String.join(",", 
progressContext.getIgnoredTableNames());
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames, 
progressContext.getCheckedRecordsCount().get(),
-                progressContext.getRecordsCount(), 
progressContext.getCheckBeginTimeMillis(), 
progressContext.getCheckEndTimeMillis(), 
progressContext.getTableCheckPositions());
+                progressContext.getRecordsCount(), 
progressContext.getCheckBeginTimeMillis(), 
progressContext.getCheckEndTimeMillis(),
+                progressContext.getSourceTableCheckPositions(), 
progressContext.getTargetTableCheckPositions());
         jobItemProgress.setStatus(context.getStatus());
         return 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
     }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index fa2bce7cdba..329d9d1e8cb 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -59,7 +59,8 @@ public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemCont
         progressContext = new ConsistencyCheckJobItemProgressContext(jobId, 
shardingItem);
         if (null != jobItemProgress) {
             
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-            
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(progressContext.getTableCheckPositions()::putAll);
+            
Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll);
+            
Optional.ofNullable(jobItemProgress.getTargetTableCheckPositions()).ifPresent(progressContext.getTargetTableCheckPositions()::putAll);
         }
         processContext = new ConsistencyCheckProcessContext(jobId);
     }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
new file mode 100644
index 00000000000..32f4ac70e18
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+
+class ConsistencyCheckJobItemContextTest {
+    
+    private static final String TABLE = "t_order";
+    
+    @Test
+    void assertConstructWithoutTableCheckPositions() {
+        Map<String, Object> sourceTableCheckPositions = Collections.emptyMap();
+        Map<String, Object> targetTableCheckPositions = Collections.emptyMap();
+        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions);
+        ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null), 0, JobStatus.RUNNING, jobItemProgress);
+        verifyProgressContext(actual.getProgressContext(), 0, 
sourceTableCheckPositions, targetTableCheckPositions);
+    }
+    
+    @Test
+    void assertConstructWithTableCheckPositions() {
+        Map<String, Object> sourceTableCheckPositions = ImmutableMap.of(TABLE, 
6);
+        Map<String, Object> targetTableCheckPositions = ImmutableMap.of(TABLE, 
5);
+        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions);
+        ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null), 0, JobStatus.RUNNING, jobItemProgress);
+        verifyProgressContext(actual.getProgressContext(), 1, 
sourceTableCheckPositions, targetTableCheckPositions);
+        
assertThat(actual.getProgressContext().getSourceTableCheckPositions().get(TABLE),
 is(6));
+        
assertThat(actual.getProgressContext().getTargetTableCheckPositions().get(TABLE),
 is(5));
+    }
+    
+    private void verifyProgressContext(final 
ConsistencyCheckJobItemProgressContext progressContext, final int expectedSize,
+                                       final Map<String, Object> 
sourceTableCheckPositions, final Map<String, Object> targetTableCheckPositions) 
{
+        assertThat(progressContext.getSourceTableCheckPositions().size(), 
is(expectedSize));
+        assertThat(progressContext.getTargetTableCheckPositions().size(), 
is(expectedSize));
+        assertNotSame(progressContext.getSourceTableCheckPositions(), 
sourceTableCheckPositions);
+        assertNotSame(progressContext.getTargetTableCheckPositions(), 
targetTableCheckPositions);
+    }
+}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index c4e90a89381..613a74c9bbf 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -56,13 +56,15 @@ class ConsistencyCheckJobTest {
         ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
         ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildPipelineJobItemContext(
                 new ShardingContext(checkJobId, "", 1, 
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, 
""));
-        assertThat(actual.getProgressContext().getTableCheckPositions(), 
is(expectTableCheckPosition));
+        assertThat(actual.getProgressContext().getSourceTableCheckPositions(), 
is(expectTableCheckPosition));
+        assertThat(actual.getProgressContext().getTargetTableCheckPositions(), 
is(expectTableCheckPosition));
     }
     
     private YamlConsistencyCheckJobItemProgress 
createYamlConsistencyCheckJobItemProgress(final Map<String, Object> 
expectTableCheckPosition) {
         YamlConsistencyCheckJobItemProgress result = new 
YamlConsistencyCheckJobItemProgress();
         result.setStatus(JobStatus.RUNNING.name());
-        result.setTableCheckPositions(expectTableCheckPosition);
+        result.setSourceTableCheckPositions(expectTableCheckPosition);
+        result.setTargetTableCheckPositions(expectTableCheckPosition);
         return result;
     }
     

Reply via email to