This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1fb87668517 Data consistency check job support breakpoint resume
transmission (#22048)
1fb87668517 is described below
commit 1fb8766851700ec17f2d4b2967a63bc42985288c
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Nov 11 11:29:04 2022 +0800
Data consistency check job support breakpoint resume transmission (#22048)
* data consistency check job add position, support start execution from the
middle of the task
* Fix codestyle
* revise comment
* Add more unit test
* Add check records count init
* Fix ci error
* rename package
* add unit test
---
.../DataConsistencyCalculateParameter.java | 2 +
.../DataConsistencyCalculatedResult.java | 9 ++
.../progress/ConsistencyCheckJobItemProgress.java | 19 +--
...SingleTableInventoryDataConsistencyChecker.java | 31 +++--
...RC32MatchDataConsistencyCalculateAlgorithm.java | 6 +
...DataMatchDataConsistencyCalculateAlgorithm.java | 18 ++-
.../yaml/YamlConsistencyCheckJobItemProgress.java | 4 +
...YamlConsistencyCheckJobItemProgressSwapper.java | 20 ++--
.../data/pipeline/core/util/ReflectionUtil.java | 29 +++++
.../consistencycheck/ConsistencyCheckJob.java | 4 +-
.../ConsistencyCheckJobAPIImpl.java | 8 +-
.../ConsistencyCheckJobItemContext.java | 12 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 3 +-
.../pipeline/core/util/ReflectionUtilTest.java | 17 ++-
.../cases/migration/AbstractMigrationITCase.java | 1 +
.../primarykey/TextPrimaryKeyMigrationIT.java | 7 +-
.../core/api/impl/MigrationJobAPIImplTest.java | 2 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 128 +++++++++++++++++++++
.../FixtureDataConsistencyCalculatedResult.java | 7 ++
.../consistencycheck/ConsistencyCheckJobTest.java | 64 +++++++++++
.../MigrationDataConsistencyCheckerTest.java | 2 +-
21 files changed, 345 insertions(+), 48 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 154643bdd59..04d9c757adf 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -73,4 +73,6 @@ public final class DataConsistencyCalculateParameter {
* Previous calculated result will be transferred to next call.
*/
private volatile Object previousCalculatedResult;
+
+ private final Object tableCheckPosition;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
index ea34f78d60e..c5d4587ed02 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.api.check.consistency;
+import java.util.Optional;
+
/**
* Data consistency calculated result.
*/
@@ -28,4 +30,11 @@ public interface DataConsistencyCalculatedResult {
* @return records count
*/
int getRecordsCount();
+
+ /**
+ * Get max unique key value.
+ *
+ * @return max unique key value
+ */
+ Optional<Object> getMaxUniqueKeyValue();
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
index dcd0195bfa3..a559023fccd 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
@@ -18,28 +18,33 @@
package org.apache.shardingsphere.data.pipeline.api.job.progress;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import java.util.Map;
+
/**
* Data consistency check job item progress.
*/
-// TODO use final for fields
@Getter
-@Setter
+@RequiredArgsConstructor
@ToString
public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemProgress {
+ @Setter
private JobStatus status = JobStatus.RUNNING;
- private String tableNames;
+ private final String tableNames;
+
+ private final Long checkedRecordsCount;
- private Long checkedRecordsCount;
+ private final Long recordsCount;
- private Long recordsCount;
+ private final Long checkBeginTimeMillis;
- private Long checkBeginTimeMillis;
+ private final Long checkEndTimeMillis;
- private Long checkEndTimeMillis;
+ private final Map<String, Object> tableCheckPositions;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index bf9aed28113..899c0f17ceb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -34,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -43,6 +43,7 @@ import
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -74,7 +75,7 @@ public final class SingleTableInventoryDataConsistencyChecker
{
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- private final PipelineJobProgressListener jobProgressListener;
+ private final ConsistencyCheckJobItemContext jobItemContext;
/**
* Data consistency check.
@@ -86,15 +87,14 @@ public final class
SingleTableInventoryDataConsistencyChecker {
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) +
"-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
- return check(calculateAlgorithm, executor, jobProgressListener);
+ return check(calculateAlgorithm, executor);
} finally {
executor.shutdown();
executor.shutdownNow();
}
}
- private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor,
- final PipelineJobProgressListener
jobProgressListener) {
+ private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor) {
String sourceDatabaseType =
sourceDataSource.getDatabaseType().getType();
String targetDatabaseType =
targetDataSource.getDatabaseType().getType();
String schemaName = sourceTable.getSchemaName().getOriginal();
@@ -102,10 +102,14 @@ public final class
SingleTableInventoryDataConsistencyChecker {
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaName, sourceTableName);
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(schemaName,
sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
+ Map<String, Object> tableCheckPositions =
jobItemContext.getTableCheckPositions();
DataConsistencyCalculateParameter sourceParameter = buildParameter(
- sourceDataSource, schemaName, sourceTableName, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
+ sourceDataSource, schemaName, sourceTableName, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey,
+ tableCheckPositions.get(sourceTableName));
+ String targetTableName = targetTable.getTableName().getOriginal();
DataConsistencyCalculateParameter targetParameter = buildParameter(
- targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType,
sourceDatabaseType, uniqueKey);
+ targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTableName, columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey,
+ tableCheckPositions.get(targetTableName));
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParameter).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParameter).iterator();
long sourceRecordsCount = 0;
@@ -126,9 +130,13 @@ public final class
SingleTableInventoryDataConsistencyChecker {
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
- if (null != jobProgressListener) {
- jobProgressListener.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+ jobItemContext.getTableCheckPositions().put(sourceTableName,
sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
+ if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+ jobItemContext.getTableCheckPositions().put(targetTableName,
targetCalculatedResult.getMaxUniqueKeyValue().get());
+ }
+ jobItemContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new
DataConsistencyContentCheckResult(contentMatched));
}
@@ -140,8 +148,9 @@ public final class
SingleTableInventoryDataConsistencyChecker {
private DataConsistencyCalculateParameter buildParameter(final
PipelineDataSourceWrapper sourceDataSource,
final String
schemaName, final String tableName, final Collection<String> columnNames,
- final String
sourceDatabaseType, final String targetDatabaseType, final
PipelineColumnMetaData uniqueKey) {
- return new DataConsistencyCalculateParameter(sourceDataSource,
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey);
+ final String
sourceDatabaseType, final String targetDatabaseType, final
PipelineColumnMetaData uniqueKey,
+ final Object
tableCheckPositionValue) {
+ return new DataConsistencyCalculateParameter(sourceDataSource,
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey, tableCheckPositionValue);
}
private <T> T waitFuture(final Future<T> future) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index fa605fbec02..aefc709ebb0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -141,5 +141,11 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
result = 31 * result + columnsCrc32.hashCode();
return result;
}
+
+ // TODO not support now
+ @Override
+ public Optional<Object> getMaxUniqueKeyValue() {
+ return Optional.empty();
+ }
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 69f2f40d15b..676c3c71fcd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -96,10 +96,16 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
Connection connection =
parameter.getDataSource().getConnection();
PreparedStatement preparedStatement =
setCurrentStatement(connection.prepareStatement(sql))) {
preparedStatement.setFetchSize(chunkSize);
+ Object tableCheckPosition = parameter.getTableCheckPosition();
if (null == previousCalculatedResult) {
- preparedStatement.setInt(1, chunkSize);
+ if (null == tableCheckPosition) {
+ preparedStatement.setInt(1, chunkSize);
+ } else {
+ preparedStatement.setObject(1, tableCheckPosition);
+ preparedStatement.setInt(2, chunkSize);
+ }
} else {
- preparedStatement.setObject(1,
previousCalculatedResult.getMaxUniqueKeyValue());
+ preparedStatement.setObject(1,
previousCalculatedResult.getMaxUniqueKeyValue().orElse(null));
preparedStatement.setInt(2, chunkSize);
}
Collection<Collection<Object>> records = new LinkedList<>();
@@ -134,7 +140,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
String cacheKey = parameter.getDatabaseType() + "-" + (null !=
schemaName &&
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
? schemaName + "." + logicTableName
: logicTableName);
- if (null == parameter.getPreviousCalculatedResult()) {
+ if (null == parameter.getPreviousCalculatedResult() && null ==
parameter.getTableCheckPosition()) {
return firstSQLCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
}
return laterSQLCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, false));
@@ -166,6 +172,10 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
private final Collection<Collection<Object>> records;
+ public Optional<Object> getMaxUniqueKeyValue() {
+ return Optional.of(maxUniqueKeyValue);
+ }
+
@SneakyThrows(SQLException.class)
@Override
public boolean equals(final Object o) {
@@ -224,7 +234,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
@Override
public int hashCode() {
- return new HashCodeBuilder(17,
37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
+ return new HashCodeBuilder(17,
37).append(getMaxUniqueKeyValue().orElse(null)).append(getRecordsCount()).append(getRecords()).toHashCode();
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index 4b610032745..62bc83d056e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -21,6 +21,8 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import java.util.Map;
+
/**
* Yaml data consistency check job item progress.
*/
@@ -39,4 +41,6 @@ public final class YamlConsistencyCheckJobItemProgress
implements YamlConfigurat
private Long checkBeginTimeMillis;
private Long checkEndTimeMillis;
+
+ private Map<String, Object> tableCheckPositions;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index 9f4dabdd9ae..0e737997ae8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -21,6 +21,9 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
/**
* YAML data check job item progress swapper.
*/
@@ -30,23 +33,24 @@ public final class
YamlConsistencyCheckJobItemProgressSwapper implements YamlCon
public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final
ConsistencyCheckJobItemProgress data) {
YamlConsistencyCheckJobItemProgress result = new
YamlConsistencyCheckJobItemProgress();
result.setStatus(data.getStatus().name());
- result.setRecordsCount(data.getRecordsCount());
+ result.setTableNames(data.getTableNames());
result.setCheckedRecordsCount(data.getCheckedRecordsCount());
+ result.setRecordsCount(data.getRecordsCount());
result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
- result.setTableNames(data.getTableNames());
+ result.setTableCheckPositions(data.getTableCheckPositions());
return result;
}
@Override
public ConsistencyCheckJobItemProgress swapToObject(final
YamlConsistencyCheckJobItemProgress yamlConfig) {
- ConsistencyCheckJobItemProgress result = new
ConsistencyCheckJobItemProgress();
+ Map<String, Object> tableCheckPositions = new LinkedHashMap<>();
+ if (null != yamlConfig.getTableCheckPositions()) {
+ tableCheckPositions.putAll(yamlConfig.getTableCheckPositions());
+ }
+ ConsistencyCheckJobItemProgress result = new
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(),
yamlConfig.getCheckedRecordsCount(),
+ yamlConfig.getRecordsCount(),
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
tableCheckPositions);
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
- result.setRecordsCount(yamlConfig.getRecordsCount());
- result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
- result.setCheckBeginTimeMillis(yamlConfig.getCheckBeginTimeMillis());
- result.setCheckEndTimeMillis(yamlConfig.getCheckEndTimeMillis());
- result.setTableNames(yamlConfig.getTableNames());
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
index 80a7bb279e2..215a6c7f514 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
@@ -92,4 +92,33 @@ public final class ReflectionUtil {
method.setAccessible(true);
return method.invoke(target, parameterValues);
}
+
+ /**
+ * Invoke method in parent class.
+ *
+ * @param target target object
+ * @param methodName method name
+ * @param parameterTypes parameter types
+ * @param parameterValues parameter values
+ * @return invoke method result.
+ * @throws NoSuchMethodException no such field exception
+ * @throws InvocationTargetException invocation target exception
+ * @throws IllegalAccessException illegal access exception
+ */
+ public static Object invokeMethodInParentClass(final Object target, final
String methodName, final Class<?>[] parameterTypes,
+ final Object[]
parameterValues) throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ Method method = null;
+ for (Class<?> clazz = target.getClass(); clazz != Object.class; clazz
= clazz.getSuperclass()) {
+ try {
+ method = clazz.getDeclaredMethod(methodName, parameterTypes);
+ method.setAccessible(true);
+ break;
+ } catch (final NoSuchMethodException ignored) {
+ }
+ }
+ if (null == method) {
+ throw new NoSuchMethodException("not find the method ");
+ }
+ return method.invoke(target, parameterValues);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index ac8f1f43580..3f7810248ae 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
@@ -35,7 +36,8 @@ public final class ConsistencyCheckJob extends
AbstractSimplePipelineJob {
@Override
protected ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING);
+ ConsistencyCheckJobItemProgress jobItemProgress =
(ConsistencyCheckJobItemProgress)
getJobAPI().getJobItemProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
+ return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 2ef7f1db3b0..60270e3bc0e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -113,13 +113,9 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
ConsistencyCheckJobItemContext context =
(ConsistencyCheckJobItemContext) jobItemContext;
- ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress();
+ ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(String.join(",", context.getTableNames()),
+ context.getCheckedRecordsCount().get(),
context.getRecordsCount(), context.getCheckBeginTimeMillis(),
context.getCheckEndTimeMillis(), context.getTableCheckPositions());
jobItemProgress.setStatus(context.getStatus());
-
jobItemProgress.setCheckedRecordsCount(context.getCheckedRecordsCount().get());
- jobItemProgress.setRecordsCount(context.getRecordsCount());
-
jobItemProgress.setCheckBeginTimeMillis(context.getCheckBeginTimeMillis());
- jobItemProgress.setCheckEndTimeMillis(context.getCheckEndTimeMillis());
- jobItemProgress.setTableNames(String.join(",",
context.getTableNames()));
YamlConsistencyCheckJobItemProgress yamlJobProgress =
swapper.swapToYamlConfiguration(jobItemProgress);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(),
context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index 3a06075341c..3a47d8489a3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -23,11 +23,15 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJo
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,14 +62,20 @@ public final class ConsistencyCheckJobItemContext
implements PipelineJobItemCont
private Long checkEndTimeMillis;
+ private final Map<String, Object> tableCheckPositions = new
ConcurrentHashMap<>();
+
private final ConsistencyCheckJobConfiguration jobConfig;
- public ConsistencyCheckJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final
JobStatus status) {
+ public ConsistencyCheckJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final
JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
this.status = status;
checkBeginTimeMillis = System.currentTimeMillis();
+ if (null != jobItemProgress) {
+
checkedRecordsCount.set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
+
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(tableCheckPositions::putAll);
+ }
}
@Override
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index e5ae0a9635e..f0ab95487b4 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -35,6 +35,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import static org.hamcrest.CoreMatchers.is;
@@ -60,7 +61,7 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithmTest {
public void setUp() throws SQLException {
PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id",
Types.INTEGER, "integer", false, true, true);
parameter = new DataConsistencyCalculateParameter(pipelineDataSource,
null,
- "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE",
"FIXTURE", uniqueKey);
+ "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE",
"FIXTURE", uniqueKey, Collections.emptyMap());
when(pipelineDataSource.getConnection()).thenReturn(connection);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
index 98c0b2901f1..e583c19bac6 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
@@ -49,9 +49,24 @@ public final class ReflectionUtilTest {
assertThat(reflectionFixture.getValue(), is("new_value"));
}
+ @Test
+ public void assertInvokeParentMethod() throws Exception {
+ ReflectionFixture reflectionFixture = new ReflectionFixture();
+ ReflectionUtil.invokeMethodInParentClass(reflectionFixture,
"setParentValue", new Class[]{String.class}, new Object[]{"parent_value"});
+ assertThat(reflectionFixture.getParentValue(), is("parent_value"));
+ }
+
+ @NoArgsConstructor
+ private static class ReflectionParentFixture {
+
+ @Getter
+ @Setter
+ private String parentValue;
+ }
+
@AllArgsConstructor
@NoArgsConstructor
- private static final class ReflectionFixture {
+ private static final class ReflectionFixture extends
ReflectionParentFixture {
@Getter
@Setter(AccessLevel.PRIVATE)
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index b07b5cf2296..21fb5e91558 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -165,6 +165,7 @@ public abstract class AbstractMigrationITCase extends
BaseITCase {
protected void assertCheckMigrationSuccess(final String jobId, final
String algorithmType) throws SQLException {
proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE
(NAME='%s')", jobId, algorithmType), 0);
+ // TODO Need to add after the stop then to start, can continue the
consistency check from the previous progress
List<Map<String, Object>> resultList = Collections.emptyList();
for (int i = 0; i < 10; i++) {
resultList = queryForListWithLog(String.format("SHOW MIGRATION
CHECK STATUS '%s'", jobId));
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index 1d9751ab0b2..163322cdb46 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -91,12 +91,7 @@ public class TextPrimaryKeyMigrationIT extends
AbstractMigrationITCase {
String jobId = listJobId().get(0);
sourceExecuteWithLog(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(),
"1000000000", 1, "afterStop"));
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
- // TODO The ordering of primary or unique keys for text types is
different, but can't reproduce now
- if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
- } else {
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
- }
+ assertCheckMigrationSuccess(jobId, "DATA_MATCH");
if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
commitMigrationByJobId(jobId);
List<String> lastJobIds = listJobId();
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index ef276fc0390..9f6c2d5f2b8 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -154,7 +154,7 @@ public final class MigrationJobAPIImplTest {
DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
ConsistencyCheckJobConfiguration checkJobConfig =
mock(ConsistencyCheckJobConfiguration.class);
when(checkJobConfig.getJobId()).thenReturn(jobConfig.getJobId() + "1");
- ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING);
+ ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING, null);
Map<String, DataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, checkJobItemContext);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
new file mode 100644
index 00000000000..d6842a93b1c
--- /dev/null
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
+
+import com.zaxxer.hikari.HikariDataSource;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public final class DataMatchDataConsistencyCalculateAlgorithmTest {
+
+ private PipelineDataSourceWrapper source;
+
+ private PipelineDataSourceWrapper target;
+
+ @Before
+ public void setUp() throws Exception {
+ source = new
PipelineDataSourceWrapper(createHikariDataSource("source_ds"), new
H2DatabaseType());
+ createTableAndInitData(source, "t_order_copy");
+ target = new
PipelineDataSourceWrapper(createHikariDataSource("target_ds"), new
H2DatabaseType());
+ createTableAndInitData(target, "t_order");
+ }
+
+ private HikariDataSource createHikariDataSource(final String databaseName)
{
+ HikariDataSource result = new HikariDataSource();
+
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
databaseName));
+ result.setUsername("root");
+ result.setPassword("root");
+ result.setMaximumPoolSize(10);
+ result.setMinimumIdle(2);
+ result.setConnectionTimeout(15 * 1000);
+ result.setIdleTimeout(40 * 1000);
+ return result;
+ }
+
+ private void createTableAndInitData(final PipelineDataSourceWrapper
dataSource, final String tableName) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ String sql = String.format("CREATE TABLE %s (order_id INT NOT
NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id))",
tableName);
+ connection.createStatement().execute(sql);
+ PreparedStatement preparedStatement =
connection.prepareStatement(String.format("INSERT INTO %s (order_id, user_id,
status) VALUES (?, ?, ?)", tableName));
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setInt(1, i + 1);
+ preparedStatement.setInt(2, i + 1);
+ preparedStatement.setString(3, "test");
+ preparedStatement.execute();
+ }
+ }
+ }
+
+ @Test
+ public void assertCalculateFromBegin() throws NoSuchFieldException,
IllegalAccessException {
+ DataMatchDataConsistencyCalculateAlgorithm calculateAlgorithm = new
DataMatchDataConsistencyCalculateAlgorithm();
+ ReflectionUtil.setFieldValue(calculateAlgorithm, "chunkSize", 5);
+ DataConsistencyCalculateParameter sourceParameter =
generateParameter(source, "t_order_copy", 0);
+ Optional<DataConsistencyCalculatedResult> sourceCalculateResult =
calculateAlgorithm.calculateChunk(sourceParameter);
+ DataConsistencyCalculateParameter targetParameter =
generateParameter(target, "t_order", 0);
+ Optional<DataConsistencyCalculatedResult> targetCalculateResult =
calculateAlgorithm.calculateChunk(targetParameter);
+ assertTrue(sourceCalculateResult.isPresent());
+ assertTrue(targetCalculateResult.isPresent());
+
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+ assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(),
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
+ assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(),
is(5L));
+ assertEquals(sourceCalculateResult.get(), targetCalculateResult.get());
+ }
+
+ @Test
+ public void assertCalculateFromMiddle() throws NoSuchFieldException,
IllegalAccessException {
+ DataMatchDataConsistencyCalculateAlgorithm calculateAlgorithm = new
DataMatchDataConsistencyCalculateAlgorithm();
+ ReflectionUtil.setFieldValue(calculateAlgorithm, "chunkSize", 5);
+ DataConsistencyCalculateParameter sourceParameter =
generateParameter(source, "t_order_copy", 5);
+ Optional<DataConsistencyCalculatedResult> sourceCalculateResult =
calculateAlgorithm.calculateChunk(sourceParameter);
+ DataConsistencyCalculateParameter targetParameter =
generateParameter(target, "t_order", 5);
+ Optional<DataConsistencyCalculatedResult> targetCalculateResult =
calculateAlgorithm.calculateChunk(targetParameter);
+ assertTrue(sourceCalculateResult.isPresent());
+ assertTrue(targetCalculateResult.isPresent());
+
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+ assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(),
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
+ assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(),
is(10L));
+ assertEquals(sourceCalculateResult.get(), targetCalculateResult.get());
+ }
+
+ private DataConsistencyCalculateParameter generateParameter(final
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object
dataCheckPositionValue) {
+ PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1,
"order_id", Types.INTEGER, "integer", false, true, true);
+ return new DataConsistencyCalculateParameter(dataSource, null,
logicTableName, Collections.emptyList(),
+ "MySQL", "MySQL", uniqueKey, dataCheckPositionValue);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ source.close();
+ target.close();
+ }
+}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
index b61af786c4a..9d9d970be0a 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
@@ -22,10 +22,17 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import java.util.Optional;
+
@RequiredArgsConstructor
@EqualsAndHashCode
@Getter
public final class FixtureDataConsistencyCalculatedResult implements
DataConsistencyCalculatedResult {
private final int recordsCount;
+
+ @Override
+ public Optional<Object> getMaxUniqueKeyValue() {
+ return Optional.empty();
+ }
}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
new file mode 100644
index 00000000000..22c07c2caf0
--- /dev/null
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public final class ConsistencyCheckJobTest {
+
+ @BeforeClass
+ public static void beforeClass() {
+ PipelineContextUtil.mockModeConfigAndContextManager();
+ }
+
+ @Test
+ public void assertBuildPipelineJobItemContext() throws
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ YamlConsistencyCheckJobItemProgress jobItemProgress = new
YamlConsistencyCheckJobItemProgress();
+ jobItemProgress.setStatus(JobStatus.RUNNING.name());
+ Map<String, Object> expectTableCheckPosition = new HashMap<>();
+ expectTableCheckPosition.put("t_order", 100);
+ jobItemProgress.setTableCheckPositions(expectTableCheckPosition);
+ String checkJobId = "j0201001";
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(checkJobId,
0, YamlEngine.marshal(jobItemProgress));
+ ConsistencyCheckJobConfiguration jobConfig = new
ConsistencyCheckJobConfiguration(checkJobId, "", null, null);
+ YamlConsistencyCheckJobConfiguration yamlJobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(jobConfig);
+ ShardingContext shardingContext = new ShardingContext(checkJobId, "",
1, YamlEngine.marshal(yamlJobConfig), 0, "");
+ ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
+ ReflectionUtil.invokeMethodInParentClass(consistencyCheckJob,
"setJobId", new Class[]{String.class}, new Object[]{checkJobId});
+ ConsistencyCheckJobItemContext actualItemContext =
consistencyCheckJob.buildPipelineJobItemContext(shardingContext);
+ assertThat(actualItemContext.getTableCheckPositions(),
is(expectTableCheckPosition));
+ }
+}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index 1b008f63e67..c6f494a0fb5 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -67,7 +67,7 @@ public final class MigrationDataConsistencyCheckerTest {
private ConsistencyCheckJobItemContext
createConsistencyCheckJobItemConfig() {
ConsistencyCheckJobConfiguration jobConfig = new
ConsistencyCheckJobConfiguration("", "", "", new Properties());
- return new ConsistencyCheckJobItemContext(jobConfig, 0,
JobStatus.RUNNING);
+ return new ConsistencyCheckJobItemContext(jobConfig, 0,
JobStatus.RUNNING, null);
}
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {