This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5575f8bf076 Remove
InventoryIncrementalJobManager.aggregateDataConsistencyCheckResults() (#29094)
5575f8bf076 is described below
commit 5575f8bf0764d84693304e498bc5806b25dcd7e0
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 20 22:15:46 2023 +0800
Remove
InventoryIncrementalJobManager.aggregateDataConsistencyCheckResults() (#29094)
---
.../service/InventoryIncrementalJobManager.java | 14 --------------
.../api/impl/ConsistencyCheckJobAPI.java | 19 +++----------------
.../migration/api/impl/MigrationJobAPITest.java | 22 ----------------------
3 files changed, 3 insertions(+), 52 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 2ef78b7e8bf..54ee4ee19ec 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
-import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
@@ -30,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobO
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -145,16 +143,4 @@ public final class InventoryIncrementalJobManager {
Optional<String> offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
return new
YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ?
YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new
YamlJobOffsetInfo());
}
-
- /**
- * Aggregate data consistency check results.
- *
- * @param jobId job ID
- * @param checkResults check results
- * @return check success or not
- */
- public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, TableDataConsistencyCheckResult> checkResults) {
- Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults
empty, jobId:", jobId);
- return
checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched);
- }
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 63e6ff42436..cf166153d54 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -31,8 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPi
import
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -47,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.Co
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.sql.Timestamp;
import java.time.Duration;
@@ -235,9 +232,9 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new
PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId,
0));
- Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
- fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
-
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
+ Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
+ result.setCheckSuccess(checkJobResults.isEmpty() ? null :
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
+
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
return result;
}
@@ -275,16 +272,6 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
}
- private void fillInJobItemInfoWithCheckResult(final
ConsistencyCheckJobItemInfo result, final Map<String,
TableDataConsistencyCheckResult> checkJobResult, final String parentJobId) {
- if (checkJobResult.isEmpty()) {
- result.setCheckSuccess(null);
- } else {
- InventoryIncrementalJobManager inventoryIncrementalJobManager =
new InventoryIncrementalJobManager(
- (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
-
result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId,
checkJobResult));
- }
- }
-
@Override
public YamlConsistencyCheckJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
return new YamlConsistencyCheckJobConfigurationSwapper();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 84a4e55572e..ce9b79e1e49 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -67,7 +67,6 @@ import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -193,27 +192,6 @@ class MigrationJobAPITest {
assertTrue(checkResultMap.get(checkKey).isMatched());
}
- @Test
- void assertAggregateEmptyDataConsistencyCheckResults() {
- assertThrows(IllegalArgumentException.class, () ->
inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
Collections.emptyMap()));
- }
-
- @Test
- void assertAggregateDifferentDataConsistencyCheckResults() {
- Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
- checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
- checkResults.put("bar_tbl", new
TableDataConsistencyCheckResult(false));
-
assertFalse(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
- }
-
- @Test
- void assertAggregateSameDataConsistencyCheckResults() {
- Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
- checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
- checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true));
-
assertTrue(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
- }
-
@Test
void assertSwitchClusterConfigurationSucceed() {
final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();