This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ab5d911478a Isolate consistency check tableCheckPositions for source
and target (#28143)
ab5d911478a is described below
commit ab5d911478a3d6f91e94f237efbab5fb0b9bdac0
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 17 16:27:05 2023 +0800
Isolate consistency check tableCheckPositions for source and target (#28143)
* Refactor tableCheckPositions to sourceTableCheckPositions and
targetTableCheckPositions
* Add ConsistencyCheckJobItemContextTest
---
.../progress/ConsistencyCheckJobItemProgress.java | 6 +-
.../yaml/YamlConsistencyCheckJobItemProgress.java | 5 +-
...YamlConsistencyCheckJobItemProgressSwapper.java | 13 ++---
.../ConsistencyCheckJobItemProgressContext.java | 4 +-
.../table/MatchingTableDataConsistencyChecker.java | 10 ++--
.../api/impl/ConsistencyCheckJobAPI.java | 3 +-
.../context/ConsistencyCheckJobItemContext.java | 3 +-
.../ConsistencyCheckJobItemContextTest.java | 65 ++++++++++++++++++++++
.../consistencycheck/ConsistencyCheckJobTest.java | 6 +-
9 files changed, 92 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
index 04950d53919..3b36f63e16c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
@@ -28,10 +28,10 @@ import java.util.Map;
/**
* Data consistency check job item progress.
*/
-// TODO move package
@RequiredArgsConstructor
@Getter
@ToString
+// TODO Refactor structure, List<TableProgress>
public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemProgress {
@Setter
@@ -49,5 +49,7 @@ public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemPro
private final Long checkEndTimeMillis;
- private final Map<String, Object> tableCheckPositions;
+ private final Map<String, Object> sourceTableCheckPositions;
+
+ private final Map<String, Object> targetTableCheckPositions;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index 3e5b9086484..c7984ca0c4e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import java.util.LinkedHashMap;
import java.util.Map;
/**
@@ -44,5 +45,7 @@ public final class YamlConsistencyCheckJobItemProgress
implements YamlConfigurat
private Long checkEndTimeMillis;
- private Map<String, Object> tableCheckPositions;
+ private Map<String, Object> sourceTableCheckPositions = new
LinkedHashMap<>();
+
+ private Map<String, Object> targetTableCheckPositions = new
LinkedHashMap<>();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index 96a28986c9d..bece1b9b17c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -21,9 +21,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
/**
* YAML data check job item progress swapper.
*/
@@ -39,18 +36,16 @@ public final class
YamlConsistencyCheckJobItemProgressSwapper implements YamlCon
result.setRecordsCount(data.getRecordsCount());
result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
- result.setTableCheckPositions(data.getTableCheckPositions());
+
result.setSourceTableCheckPositions(data.getSourceTableCheckPositions());
+
result.setTargetTableCheckPositions(data.getTargetTableCheckPositions());
return result;
}
@Override
public ConsistencyCheckJobItemProgress swapToObject(final
YamlConsistencyCheckJobItemProgress yamlConfig) {
- Map<String, Object> tableCheckPositions = new LinkedHashMap<>();
- if (null != yamlConfig.getTableCheckPositions()) {
- tableCheckPositions.putAll(yamlConfig.getTableCheckPositions());
- }
ConsistencyCheckJobItemProgress result = new
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(),
yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(),
- yamlConfig.getRecordsCount(),
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
tableCheckPositions);
+ yamlConfig.getRecordsCount(),
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
+ yamlConfig.getSourceTableCheckPositions(),
yamlConfig.getTargetTableCheckPositions());
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index f11b5d9c8d4..7822408a81c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -54,7 +54,9 @@ public final class ConsistencyCheckJobItemProgressContext
implements PipelineJob
private volatile Long checkEndTimeMillis;
- private final Map<String, Object> tableCheckPositions = new
ConcurrentHashMap<>();
+ private final Map<String, Object> sourceTableCheckPositions = new
ConcurrentHashMap<>();
+
+ private final Map<String, Object> targetTableCheckPositions = new
ConcurrentHashMap<>();
@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
index b88f7bf2d17..79518e3edb4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.sql.SQLException;
import java.util.Iterator;
-import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -65,11 +64,10 @@ public abstract class MatchingTableDataConsistencyChecker
implements TableDataCo
}
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableDataConsistencyCheckParameter param,
final ThreadPoolExecutor executor) {
- Map<String, Object> tableCheckPositions =
param.getProgressContext().getTableCheckPositions();
SingleTableInventoryCalculateParameter sourceParam = new
SingleTableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
- param.getColumnNames(), param.getUniqueKeys(),
tableCheckPositions.get(param.getSourceTable().getTableName().getOriginal()));
+ param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getOriginal()));
SingleTableInventoryCalculateParameter targetParam = new
SingleTableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
- param.getColumnNames(), param.getUniqueKeys(),
tableCheckPositions.get(param.getTargetTable().getTableName().getOriginal()));
+ param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getOriginal()));
SingleTableInventoryCalculator calculator =
getSingleTableInventoryCalculator();
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults
= waitFuture(executor.submit(() ->
calculator.calculate(sourceParam))).iterator();
Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults
= waitFuture(executor.submit(() ->
calculator.calculate(targetParam))).iterator();
@@ -99,10 +97,10 @@ public abstract class MatchingTableDataConsistencyChecker
implements TableDataCo
break;
}
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-
param.getProgressContext().getTableCheckPositions().put(param.getSourceTable().getTableName().getOriginal(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
+
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getOriginal(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-
param.getProgressContext().getTableCheckPositions().put(param.getTargetTable().getTableName().getOriginal(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
+
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getOriginal(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
}
param.getProgressContext().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 361d045d6df..6473566fe4f 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -149,7 +149,8 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
String tableNames = String.join(",", progressContext.getTableNames());
String ignoredTableNames = String.join(",",
progressContext.getIgnoredTableNames());
ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames,
progressContext.getCheckedRecordsCount().get(),
- progressContext.getRecordsCount(),
progressContext.getCheckBeginTimeMillis(),
progressContext.getCheckEndTimeMillis(),
progressContext.getTableCheckPositions());
+ progressContext.getRecordsCount(),
progressContext.getCheckBeginTimeMillis(),
progressContext.getCheckEndTimeMillis(),
+ progressContext.getSourceTableCheckPositions(),
progressContext.getTargetTableCheckPositions());
jobItemProgress.setStatus(context.getStatus());
return
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index fa2bce7cdba..329d9d1e8cb 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -59,7 +59,8 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
progressContext = new ConsistencyCheckJobItemProgressContext(jobId,
shardingItem);
if (null != jobItemProgress) {
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(progressContext.getTableCheckPositions()::putAll);
+
Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll);
+
Optional.ofNullable(jobItemProgress.getTargetTableCheckPositions()).ifPresent(progressContext.getTargetTableCheckPositions()::putAll);
}
processContext = new ConsistencyCheckProcessContext(jobId);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
new file mode 100644
index 00000000000..32f4ac70e18
--- /dev/null
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+
+class ConsistencyCheckJobItemContextTest {
+
+ private static final String TABLE = "t_order";
+
+ @Test
+ void assertConstructWithoutTableCheckPositions() {
+ Map<String, Object> sourceTableCheckPositions = Collections.emptyMap();
+ Map<String, Object> targetTableCheckPositions = Collections.emptyMap();
+ ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null,
sourceTableCheckPositions, targetTableCheckPositions);
+ ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null), 0, JobStatus.RUNNING, jobItemProgress);
+ verifyProgressContext(actual.getProgressContext(), 0,
sourceTableCheckPositions, targetTableCheckPositions);
+ }
+
+ @Test
+ void assertConstructWithTableCheckPositions() {
+ Map<String, Object> sourceTableCheckPositions = ImmutableMap.of(TABLE,
6);
+ Map<String, Object> targetTableCheckPositions = ImmutableMap.of(TABLE,
5);
+ ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null,
sourceTableCheckPositions, targetTableCheckPositions);
+ ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null), 0, JobStatus.RUNNING, jobItemProgress);
+ verifyProgressContext(actual.getProgressContext(), 1,
sourceTableCheckPositions, targetTableCheckPositions);
+
assertThat(actual.getProgressContext().getSourceTableCheckPositions().get(TABLE),
is(6));
+
assertThat(actual.getProgressContext().getTargetTableCheckPositions().get(TABLE),
is(5));
+ }
+
+ private void verifyProgressContext(final
ConsistencyCheckJobItemProgressContext progressContext, final int expectedSize,
+ final Map<String, Object>
sourceTableCheckPositions, final Map<String, Object> targetTableCheckPositions)
{
+ assertThat(progressContext.getSourceTableCheckPositions().size(),
is(expectedSize));
+ assertThat(progressContext.getTargetTableCheckPositions().size(),
is(expectedSize));
+ assertNotSame(progressContext.getSourceTableCheckPositions(),
sourceTableCheckPositions);
+ assertNotSame(progressContext.getTargetTableCheckPositions(),
targetTableCheckPositions);
+ }
+}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index c4e90a89381..613a74c9bbf 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -56,13 +56,15 @@ class ConsistencyCheckJobTest {
ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildPipelineJobItemContext(
new ShardingContext(checkJobId, "", 1,
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0,
""));
- assertThat(actual.getProgressContext().getTableCheckPositions(),
is(expectTableCheckPosition));
+ assertThat(actual.getProgressContext().getSourceTableCheckPositions(),
is(expectTableCheckPosition));
+ assertThat(actual.getProgressContext().getTargetTableCheckPositions(),
is(expectTableCheckPosition));
}
private YamlConsistencyCheckJobItemProgress
createYamlConsistencyCheckJobItemProgress(final Map<String, Object>
expectTableCheckPosition) {
YamlConsistencyCheckJobItemProgress result = new
YamlConsistencyCheckJobItemProgress();
result.setStatus(JobStatus.RUNNING.name());
- result.setTableCheckPositions(expectTableCheckPosition);
+ result.setSourceTableCheckPositions(expectTableCheckPosition);
+ result.setTargetTableCheckPositions(expectTableCheckPosition);
return result;
}