This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5dc8fbfebab Extract ConsistencyCheckJobItemProgressContext to decouple
ConsistencyCheckJobItemContext (#22541)
5dc8fbfebab is described below
commit 5dc8fbfebab73b7fb368fc96f9a1a8d2f249b0b6
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 1 09:47:28 2022 +0800
Extract ConsistencyCheckJobItemProgressContext to decouple
ConsistencyCheckJobItemContext (#22541)
* Remove checkJobItemContext from SingleTableInventoryDataConsistencyChecker
* Extract ConsistencyCheckJobItemProgressContext
* Replace ConsistencyCheckJobItemContext to
ConsistencyCheckJobItemProgressContext in API and impl
* Move core classes from scenario module to core module
* Clean unused mock
* Clean unused methods
---
.../progress/ConsistencyCheckJobItemProgress.java | 1 +
.../core/api/InventoryIncrementalJobAPI.java | 7 ++--
.../AbstractInventoryIncrementalJobAPIImpl.java | 8 ++---
.../ConsistencyCheckJobItemProgressContext.java} | 41 ++++------------------
...SingleTableInventoryDataConsistencyChecker.java | 7 ++--
.../job/progress/PipelineJobProgressDetector.java | 26 --------------
.../ConsistencyCheckJobAPIImpl.java | 6 ++--
.../ConsistencyCheckJobItemContext.java | 37 ++++---------------
.../ConsistencyCheckTasksRunner.java | 4 +--
.../migration/MigrationDataConsistencyChecker.java | 14 ++++----
.../scenario/migration/MigrationJobAPIImpl.java | 6 ++--
.../core/api/impl/MigrationJobAPIImplTest.java | 10 ++----
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
.../MigrationDataConsistencyCheckerTest.java | 12 +++----
14 files changed, 47 insertions(+), 134 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
index a559023fccd..fd580eea4ba 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
@@ -28,6 +28,7 @@ import java.util.Map;
/**
* Data consistency check job item progress.
*/
+// TODO move package
@Getter
@RequiredArgsConstructor
@ToString
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
similarity index 91%
rename from
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index adbf8429ca6..901360ef659 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.util.Map;
@@ -57,12 +57,11 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*
* @param pipelineJobConfig job configuration
* @param calculateAlgorithm calculate algorithm
- * @param checkJobItemContext consistency check job item context
+ * @param progressContext consistency check job item progress context
* @return each logic table check result
*/
- // TODO do not depend on ConsistencyCheckJobItemContext
Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm,
-
ConsistencyCheckJobItemContext checkJobItemContext);
+
ConsistencyCheckJobItemProgressContext progressContext);
/**
* Aggregate data consistency check results.
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
similarity index 97%
rename from
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index d5f76459eb1..51d34cf1bbc 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobI
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
@@ -42,7 +43,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInvent
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -189,16 +189,16 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculateAlgorithm,
- final
ConsistencyCheckJobItemContext checkJobItemContext) {
+ final
ConsistencyCheckJobItemProgressContext progressContext) {
String jobId = jobConfig.getJobId();
- PipelineDataConsistencyChecker dataConsistencyChecker =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig), checkJobItemContext);
+ PipelineDataConsistencyChecker dataConsistencyChecker =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig), progressContext);
Map<String, DataConsistencyCheckResult> result =
dataConsistencyChecker.check(calculateAlgorithm);
log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, calculateAlgorithm.getType(), result);
return result;
}
protected abstract PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext,
-
ConsistencyCheckJobItemContext checkJobItemContext);
+
ConsistencyCheckJobItemProgressContext progressContext);
@Override
public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, DataConsistencyCheckResult> checkResults) {
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ConsistencyCheckJobItemProgressContext.java
similarity index 54%
copy from
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ConsistencyCheckJobItemProgressContext.java
index dd39f9bbed8..733f9285cab 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ConsistencyCheckJobItemProgressContext.java
@@ -15,74 +15,45 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import java.util.Collection;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
/**
- * Consistency check job item context.
+ * Consistency check job item progress context.
*/
+@RequiredArgsConstructor
@Getter
@Setter
-public final class ConsistencyCheckJobItemContext implements
PipelineJobItemContext, PipelineJobProgressListener {
+public final class ConsistencyCheckJobItemProgressContext implements
PipelineJobProgressListener {
private final String jobId;
private final int shardingItem;
- private String dataSourceName;
-
- private volatile boolean stopping;
-
- private volatile JobStatus status;
-
private final Collection<String> tableNames = new CopyOnWriteArraySet<>();
private volatile long recordsCount;
private final AtomicLong checkedRecordsCount = new AtomicLong(0);
- private final long checkBeginTimeMillis;
+ private final long checkBeginTimeMillis = System.currentTimeMillis();
private volatile Long checkEndTimeMillis;
private final Map<String, Object> tableCheckPositions = new
ConcurrentHashMap<>();
- private final ConsistencyCheckJobConfiguration jobConfig;
-
- public ConsistencyCheckJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final
JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
- this.jobConfig = jobConfig;
- jobId = jobConfig.getJobId();
- this.shardingItem = shardingItem;
- this.status = status;
- checkBeginTimeMillis = System.currentTimeMillis();
- if (null != jobItemProgress) {
-
checkedRecordsCount.set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(tableCheckPositions::putAll);
- }
- }
-
- @Override
- public PipelineProcessContext getJobProcessContext() {
- throw new UnsupportedOperationException();
- }
-
@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
similarity index 95%
rename from
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index ac296ecb225..060421536fd 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -75,7 +74,7 @@ public final class SingleTableInventoryDataConsistencyChecker
{
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- private final ConsistencyCheckJobItemContext jobItemContext;
+ private final ConsistencyCheckJobItemProgressContext progressContext;
/**
* Data consistency check.
@@ -102,7 +101,7 @@ public final class
SingleTableInventoryDataConsistencyChecker {
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaName, sourceTableName);
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(schemaName,
sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
- Map<String, Object> tableCheckPositions =
jobItemContext.getTableCheckPositions();
+ Map<String, Object> tableCheckPositions =
progressContext.getTableCheckPositions();
DataConsistencyCalculateParameter sourceParam = buildParameter(
sourceDataSource, schemaName, sourceTableName, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey,
tableCheckPositions.get(sourceTableName));
String targetTableName = targetTable.getTableName().getOriginal();
@@ -134,7 +133,7 @@ public final class
SingleTableInventoryDataConsistencyChecker {
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
tableCheckPositions.put(targetTableName,
targetCalculatedResult.getMaxUniqueKeyValue().get());
}
- jobItemContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new
DataConsistencyContentCheckResult(contentMatched));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
index 011821c2f07..f583b736789 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
@@ -21,8 +21,6 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import java.util.Collection;
@@ -46,28 +44,4 @@ public final class PipelineJobProgressDetector {
}
return inventoryTasks.stream().allMatch(each ->
each.getTaskProgress().getPosition() instanceof FinishedPosition);
}
-
- /**
- * Whether job is completed (successful or failed).
- *
- * @param jobShardingCount job sharding count
- * @param jobItemProgresses job item progresses
- * @return completed or not
- */
- public static boolean isJobCompleted(final int jobShardingCount, final
Collection<? extends PipelineJobItemProgress> jobItemProgresses) {
- return jobShardingCount == jobItemProgresses.size()
- && jobItemProgresses.stream().allMatch(each -> null != each &&
!each.getStatus().isRunning());
- }
-
- /**
- * Whether job is successful.
- *
- * @param jobShardingCount job sharding count
- * @param jobItemProgresses job item progresses
- * @return completed or not
- */
- public static boolean isJobSuccessful(final int jobShardingCount, final
Collection<? extends PipelineJobItemProgress> jobItemProgresses) {
- return jobShardingCount == jobItemProgresses.size()
- && jobItemProgresses.stream().allMatch(each -> null != each &&
JobStatus.FINISHED == each.getStatus());
- }
}
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 3f17b5b8433..77635c72f06 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -39,6 +39,7 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -114,8 +115,9 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
ConsistencyCheckJobItemContext context =
(ConsistencyCheckJobItemContext) jobItemContext;
- ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(String.join(",", context.getTableNames()),
- context.getCheckedRecordsCount().get(),
context.getRecordsCount(), context.getCheckBeginTimeMillis(),
context.getCheckEndTimeMillis(), context.getTableCheckPositions());
+ ConsistencyCheckJobItemProgressContext progressContext =
context.getProgressContext();
+ ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(String.join(",",
progressContext.getTableNames()),
progressContext.getCheckedRecordsCount().get(),
+ progressContext.getRecordsCount(),
progressContext.getCheckBeginTimeMillis(),
progressContext.getCheckEndTimeMillis(),
progressContext.getTableCheckPositions());
jobItemProgress.setStatus(context.getStatus());
YamlConsistencyCheckJobItemProgress yamlJobProgress =
swapper.swapToYamlConfiguration(jobItemProgress);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(),
context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index dd39f9bbed8..501854a71ca 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -24,23 +24,16 @@ import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContex
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
-import java.util.Collection;
-import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Consistency check job item context.
*/
@Getter
@Setter
-public final class ConsistencyCheckJobItemContext implements
PipelineJobItemContext, PipelineJobProgressListener {
+public final class ConsistencyCheckJobItemContext implements
PipelineJobItemContext {
private final String jobId;
@@ -52,29 +45,19 @@ public final class ConsistencyCheckJobItemContext
implements PipelineJobItemCont
private volatile JobStatus status;
- private final Collection<String> tableNames = new CopyOnWriteArraySet<>();
-
- private volatile long recordsCount;
-
- private final AtomicLong checkedRecordsCount = new AtomicLong(0);
-
- private final long checkBeginTimeMillis;
-
- private volatile Long checkEndTimeMillis;
-
- private final Map<String, Object> tableCheckPositions = new
ConcurrentHashMap<>();
-
private final ConsistencyCheckJobConfiguration jobConfig;
+ private final ConsistencyCheckJobItemProgressContext progressContext;
+
public ConsistencyCheckJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final
JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
this.status = status;
- checkBeginTimeMillis = System.currentTimeMillis();
+ progressContext = new ConsistencyCheckJobItemProgressContext(jobId,
shardingItem);
if (null != jobItemProgress) {
-
checkedRecordsCount.set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(tableCheckPositions::putAll);
+
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
+
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(progressContext.getTableCheckPositions()::putAll);
}
}
@@ -82,10 +65,4 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
public PipelineProcessContext getJobProcessContext() {
throw new UnsupportedOperationException();
}
-
- @Override
- public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
- checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
- PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
- }
}
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index 9859ff5bb12..dfab5bf2383 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -101,10 +101,10 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
setCalculateAlgorithm(calculateAlgorithm);
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
try {
- dataConsistencyCheckResult =
jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm,
jobItemContext);
+ dataConsistencyCheckResult =
jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm,
jobItemContext.getProgressContext());
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
} finally {
-
jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
+
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
}
}
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 24b0eb902c9..506774046f5 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -29,12 +29,12 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -59,15 +59,15 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- private final ConsistencyCheckJobItemContext checkJobItemContext;
+ private final ConsistencyCheckJobItemProgressContext progressContext;
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext,
- final
ConsistencyCheckJobItemContext checkJobItemContext) {
+ final
ConsistencyCheckJobItemProgressContext progressContext) {
this.jobConfig = jobConfig;
readRateLimitAlgorithm = null != processContext ?
processContext.getReadRateLimitAlgorithm() : null;
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new
HashSet<>(Arrays.asList(jobConfig.getSourceTableName(),
jobConfig.getTargetTableName()))));
- this.checkJobItemContext = checkJobItemContext;
+ this.progressContext = progressContext;
}
@Override
@@ -80,11 +80,11 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getSource());
PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
- checkJobItemContext.setRecordsCount(getRecordsCount());
-
checkJobItemContext.getTableNames().add(jobConfig.getSourceTableName());
+ progressContext.setRecordsCount(getRecordsCount());
+
progressContext.getTableNames().add(jobConfig.getSourceTableName());
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
SingleTableInventoryDataConsistencyChecker
singleTableInventoryChecker = new
SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(),
sourceDataSource, targetDataSource,
- sourceTable, targetTable, jobConfig.getUniqueKeyColumn(),
metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
+ sourceTable, targetTable, jobConfig.getUniqueKeyColumn(),
metaDataLoader, readRateLimitAlgorithm, progressContext);
result.put(sourceTable.getTableName().getOriginal(),
singleTableInventoryChecker.check(calculateAlgorithm));
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
diff --git
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index a9dd585ecc8..2a9c508c921 100644
---
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -53,6 +53,7 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInf
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -63,7 +64,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -242,8 +242,8 @@ public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAP
@Override
protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
-
final ConsistencyCheckJobItemContext checkJobItemContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext, checkJobItemContext);
+
final ConsistencyCheckJobItemProgressContext progressContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext, progressContext);
}
@Override
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 4f5de0fcd58..75fe65e125b 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -21,7 +21,6 @@ import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -31,11 +30,11 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
@@ -70,8 +69,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MigrationJobAPIImplTest {
@@ -156,10 +153,7 @@ public final class MigrationJobAPIImplTest {
Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
- ConsistencyCheckJobConfiguration checkJobConfig =
mock(ConsistencyCheckJobConfiguration.class);
- when(checkJobConfig.getJobId()).thenReturn(jobConfig.getJobId() + "1");
- ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING, null);
- Map<String, DataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, checkJobItemContext);
+ Map<String, DataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0));
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(),
is(2L));
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 22c07c2caf0..539254e74e7 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -59,6 +59,6 @@ public final class ConsistencyCheckJobTest {
ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
ReflectionUtil.invokeMethodInParentClass(consistencyCheckJob,
"setJobId", new Class[]{String.class}, new Object[]{checkJobId});
ConsistencyCheckJobItemContext actualItemContext =
consistencyCheckJob.buildPipelineJobItemContext(shardingContext);
- assertThat(actualItemContext.getTableCheckPositions(),
is(expectTableCheckPosition));
+
assertThat(actualItemContext.getProgressContext().getTableCheckPositions(),
is(expectTableCheckPosition));
}
}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index c6f494a0fb5..8fa1b6f2f24 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -18,16 +18,14 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -38,7 +36,6 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
-import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -59,15 +56,14 @@ public final class MigrationDataConsistencyCheckerTest {
PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobConfig.getJobId(),
0, "");
Map<String, DataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
- createConsistencyCheckJobItemConfig()).check(new
DataConsistencyCalculateAlgorithmFixture());
+ createConsistencyCheckJobItemProgressContext()).check(new
DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(),
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
}
- private ConsistencyCheckJobItemContext
createConsistencyCheckJobItemConfig() {
- ConsistencyCheckJobConfiguration jobConfig = new
ConsistencyCheckJobConfiguration("", "", "", new Properties());
- return new ConsistencyCheckJobItemContext(jobConfig, 0,
JobStatus.RUNNING, null);
+ private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext() {
+ return new ConsistencyCheckJobItemProgressContext("", 0);
}
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {