This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 486a1565a31 Revise show migration check status DistSQL impl; Clean
unused dataConsistencyCheck methods (#21488)
486a1565a31 is described below
commit 486a1565a31ce11ca09e0360621dbfb81d7c9d18
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Oct 11 15:57:36 2022 +0800
Revise show migration check status DistSQL impl; Clean unused
dataConsistencyCheck methods (#21488)
* Revise show migration check status DistSQL impl
* Simplify aggregateDataConsistencyCheckResults impl
* Clean unused dataConsistencyCheck methods; Improve dataConsistencyCheck
unit test
* Remove unused transient
* Recover wait time in IT
* Fix getJobProgressInfo
* Convert "NULL" to "" in check status
* Fix setRecordsCount too late
* Rename to emptyIfNull
---
.../ShowMigrationCheckStatusQueryResultSet.java | 13 +++++--
.../api/InventoryIncrementalJobPublicAPI.java | 20 ----------
.../consistency/DataConsistencyCheckResult.java | 9 +++++
.../api/pojo/ConsistencyCheckJobProgressInfo.java | 2 +-
.../core/api/InventoryIncrementalJobAPI.java | 2 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 22 +----------
...SingleTableInventoryDataConsistencyChecker.java | 25 +++++++------
.../ConsistencyCheckJobAPIImpl.java | 43 ++++++++++++----------
.../migration/MigrationDataConsistencyChecker.java | 7 +---
.../yaml/metadata/YamlPipelineColumnMetaData.java | 4 ++
.../data/pipeline/cases/base/BaseITCase.java | 2 +-
.../cases/migration/AbstractMigrationITCase.java | 2 +-
.../core/api/impl/MigrationJobAPIImplTest.java | 24 ++++--------
.../core/util/JobConfigurationBuilder.java | 6 ++-
14 files changed, 77 insertions(+), 104 deletions(-)
diff --git
a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index e893f660de8..5c4a3b0247b 100644
---
a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++
b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -47,15 +47,20 @@ public final class ShowMigrationCheckStatusQueryResultSet
implements DatabaseDis
ConsistencyCheckJobProgressInfo progressInfo =
JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
List<Collection<Object>> result = new LinkedList<>();
String checkResult = null == progressInfo.getResult() ? "" :
progressInfo.getResult().toString();
- result.add(Arrays.asList(progressInfo.getTableName(), checkResult,
String.valueOf(progressInfo.getFinishedPercentage()),
- ObjectUtils.defaultIfNull(progressInfo.getRemainingSeconds(),
""), progressInfo.getCheckBeginTime(),
ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
- ObjectUtils.defaultIfNull(progressInfo.getDurationSeconds(),
""), progressInfo.getErrorMessage()));
+ result.add(Arrays.asList(emptyIfNull(progressInfo.getTableNames()),
checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
+ emptyIfNull(progressInfo.getRemainingSeconds()),
+ emptyIfNull(progressInfo.getCheckBeginTime()),
emptyIfNull(progressInfo.getCheckEndTime()),
+ emptyIfNull(progressInfo.getDurationSeconds()),
emptyIfNull(progressInfo.getErrorMessage())));
data = result.iterator();
}
+ private Object emptyIfNull(final Object object) {
+ return ObjectUtils.defaultIfNull(object, "");
+ }
+
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("table_name", "result", "finished_percentage",
"remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds",
"error_message");
+ return Arrays.asList("tables", "result", "finished_percentage",
"remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds",
"error_message");
}
@Override
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 47b0051238c..20d50211524 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -27,7 +26,6 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
-import java.util.Properties;
/**
* Inventory incremental job public API.
@@ -92,22 +90,4 @@ public interface InventoryIncrementalJobPublicAPI extends
PipelineJobPublicAPI,
* @return data consistency check algorithms
*/
Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @param algorithmType algorithm type
- * @param algorithmProps algorithm props. Nullable
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps);
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 00bf9120381..71b696fcded 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -32,4 +32,13 @@ public final class DataConsistencyCheckResult {
private final DataConsistencyCountCheckResult countCheckResult;
private final DataConsistencyContentCheckResult contentCheckResult;
+
+ /**
+ * Is count and content matched.
+ *
+ * @return matched or not
+ */
+ public boolean isMatched() {
+ return countCheckResult.isMatched() && contentCheckResult.isMatched();
+ }
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
index 59604a4e991..e59755a9ccc 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
@@ -26,7 +26,7 @@ import lombok.Data;
@Data
public final class ConsistencyCheckJobProgressInfo {
- private String tableName;
+ private String tableNames;
private Boolean result;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 668724a5d56..79ad6b18c56 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -57,7 +57,7 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*
* @param pipelineJobConfig job configuration
* @param calculateAlgorithm calculate algorithm
- * @param checkJobItemContext consistency check job progress listener
+ * @param checkJobItemContext consistency check job item context
* @return each logic table check result
*/
Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm,
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 19be5acdd2c..55e0e245383 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -155,23 +155,6 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
:
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps);
}
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
- checkModeConfig();
- log.info("Data consistency check for job {}", jobId);
- PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- DataConsistencyCalculateAlgorithm calculateAlgorithm =
buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
- return dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId, final String algorithmType, final Properties algorithmProps) {
- checkModeConfig();
- log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
- PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig,
buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType,
algorithmProps), null);
- }
-
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculateAlgorithm,
final
ConsistencyCheckJobItemContext checkJobItemContext) {
@@ -193,10 +176,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}
for (Entry<String, DataConsistencyCheckResult> entry :
checkResults.entrySet()) {
DataConsistencyCheckResult checkResult = entry.getValue();
- boolean isCountMatched =
checkResult.getCountCheckResult().isMatched();
- boolean isContentMatched =
checkResult.getContentCheckResult().isMatched();
- if (!isCountMatched || !isContentMatched) {
- log.error("job: {}, table: {} data consistency check failed,
count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched,
isContentMatched);
+ if (!checkResult.isMatched()) {
return false;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 3547a2a1f48..a9eaa29f2a2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -100,12 +100,20 @@ public final class
SingleTableInventoryDataConsistencyChecker {
}
private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor,
- final
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
+ final
ConsistencyCheckJobItemContext checkJobItemContext) {
String sourceDatabaseType =
sourceDataSource.getDatabaseType().getType();
String targetDatabaseType =
targetDataSource.getDatabaseType().getType();
String sourceTableName = sourceTable.getTableName().getOriginal();
String schemaName = sourceTable.getSchemaName().getOriginal();
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaName, sourceTableName);
+ if (null != checkJobItemContext) {
+
checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
+ InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI
=
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+ long recordsCount =
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+ checkJobItemContext.setRecordsCount(recordsCount);
+ log.info("check, get records count: {}", recordsCount);
+ }
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
DataConsistencyCalculateParameter sourceParameter = buildParameter(
@@ -114,13 +122,6 @@ public final class
SingleTableInventoryDataConsistencyChecker {
targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType,
sourceDatabaseType, uniqueKey);
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParameter).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParameter).iterator();
- if (null != consistencyCheckJobItemContext) {
-
consistencyCheckJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
- InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI
=
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
- long recordsCount =
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
- consistencyCheckJobItemContext.setRecordsCount(recordsCount);
- }
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
@@ -139,12 +140,12 @@ public final class
SingleTableInventoryDataConsistencyChecker {
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
- if (null != consistencyCheckJobItemContext) {
- consistencyCheckJobItemContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ if (null != checkJobItemContext) {
+ checkJobItemContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
}
- if (null != consistencyCheckJobItemContext) {
-
consistencyCheckJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
+ if (null != checkJobItemContext) {
+
checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
}
return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new
DataConsistencyContentCheckResult(contentMatched));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index c46efa359f1..60f9e0da017 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -36,11 +36,13 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgr
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
@@ -49,13 +51,12 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
/**
@@ -64,6 +65,8 @@ import java.util.Optional;
@Slf4j
public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl implements ConsistencyCheckJobAPI {
+ private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
private final YamlConsistencyCheckJobProgressSwapper swapper = new
YamlConsistencyCheckJobProgressSwapper();
@Override
@@ -175,33 +178,35 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
if (null == jobItemProgress) {
return result;
}
- int finishedPercentage;
LocalDateTime checkBeginTime = new
Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
- if (null != jobItemProgress.getRecordsCount() &&
Objects.equals(jobItemProgress.getCheckedRecordsCount(),
jobItemProgress.getRecordsCount())) {
- finishedPercentage = 100;
+ if (null == jobItemProgress.getRecordsCount()) {
+ result.setFinishedPercentage(0);
+ result.setResult(false);
+ return result;
+ }
+ long recordsCount = jobItemProgress.getRecordsCount();
+ if (JobStatus.FINISHED == jobItemProgress.getStatus()) {
+ result.setFinishedPercentage(100);
LocalDateTime checkEndTime = new
Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
Duration duration = Duration.between(checkBeginTime, checkEndTime);
- result.setDurationSeconds(duration.toMillis() / 1000);
+ result.setDurationSeconds(duration.getSeconds());
result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
result.setRemainingSeconds(0L);
} else {
- if (null == jobItemProgress.getRecordsCount()) {
- finishedPercentage = 0;
- } else {
- finishedPercentage = Math.min(100,
BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() *
100, jobItemProgress.getRecordsCount())).intValue());
- Duration duration = Duration.between(checkBeginTime,
LocalDateTime.now());
- long remainMills = jobItemProgress.getRecordsCount() * 100 /
jobItemProgress.getCheckedRecordsCount() * duration.toMillis();
- result.setRemainingSeconds(remainMills / 1000);
- }
+ long checkedRecordsCount =
Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
+ result.setFinishedPercentage((int) (checkedRecordsCount * 100 /
recordsCount));
+ Duration duration = Duration.between(checkBeginTime,
LocalDateTime.now());
+ result.setDurationSeconds(duration.getSeconds());
+ long remainingMills = (recordsCount - checkedRecordsCount) /
recordsCount * duration.toMillis();
+ result.setRemainingSeconds(remainingMills / 1000);
}
- result.setFinishedPercentage(finishedPercentage);
- String tableName = null == jobItemProgress.getTableNames() ? null :
jobItemProgress.getTableNames().split(",")[0];
- result.setTableName(Optional.ofNullable(tableName).orElse(""));
+ String tableNames = jobItemProgress.getTableNames();
+ result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
Map<String, DataConsistencyCheckResult> checkJobResult =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId,
checkJobId);
- Optional<DataConsistencyCheckResult> dataConsistencyCheckResult =
Optional.ofNullable(checkJobResult.get(tableName));
- dataConsistencyCheckResult.ifPresent(optional ->
result.setResult(optional.getContentCheckResult().isMatched()));
+ InventoryIncrementalJobAPI inventoryIncrementalJobAPI =
(InventoryIncrementalJobAPI)
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(parentJobId));
+
result.setResult(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId,
checkJobResult));
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 91fa69b6fdc..f6b97a2a455 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
@@ -75,12 +74,10 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
SchemaTableName sourceTable = new SchemaTableName(new
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())),
new TableName(jobConfig.getSourceTableName()));
SchemaTableName targetTable = new SchemaTableName(new
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())),
new TableName(jobConfig.getTargetTableName()));
- PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter());
- PipelineDataSourceConfiguration targetDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
try (
- PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
- PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
+ PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getSource());
+ PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
SingleTableInventoryDataConsistencyChecker
singleTableInventoryChecker = new
SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(),
sourceDataSource, targetDataSource,
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(),
metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
index e6f88962b17..c4a96dc64e4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
@@ -17,12 +17,16 @@
package org.apache.shardingsphere.data.pipeline.yaml.metadata;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
* Yaml pipeline column meta data.
*/
+@NoArgsConstructor
+@AllArgsConstructor
@Data
public final class YamlPipelineColumnMetaData implements YamlConfiguration {
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index f269624890e..328bbfc0b1d 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -286,7 +286,7 @@ public abstract class BaseITCase {
}
assertFalse(CollectionUtils.containsAny(actualStatus,
Arrays.asList(JobStatus.PREPARING_FAILURE.name(),
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
- if (Collections.min(incrementalIdleSecondsList) < 15) {
+ if (Collections.min(incrementalIdleSecondsList) <= 5) {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
continue;
}
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index a04d7224de4..ac23115bf9b 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -172,7 +172,7 @@ public abstract class AbstractMigrationITCase extends
BaseITCase {
if (checkEndTimeList.size() == resultList.size()) {
break;
}
- ThreadUtil.sleep(5, TimeUnit.SECONDS);
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
log.info("check job results: {}", resultList);
for (Map<String, Object> entry : resultList) {
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 9cd96221ed8..452001a8b4a 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -27,17 +27,16 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.junit.AfterClass;
@@ -149,22 +148,13 @@ public final class MigrationJobAPIImplTest {
}
@Test
- public void assertDataConsistencyCheck() throws NoSuchFieldException,
IllegalAccessException {
- MigrationJobConfiguration jobConfiguration =
JobConfigurationBuilder.createJobConfiguration();
- ReflectionUtil.setFieldValue(jobConfiguration, "uniqueKeyColumn", new
PipelineColumnMetaData(1, "order_id", 4, "", false, true, true));
- Optional<String> jobId = jobAPI.start(jobConfiguration);
- assertTrue(jobId.isPresent());
- initTableData(jobAPI.getJobConfiguration(jobId.get()));
- Map<String, DataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobId.get());
- assertThat(checkResultMap.size(), is(1));
- }
-
- @Test
- public void assertDataConsistencyCheckWithAlgorithm() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ public void assertDataConsistencyCheck() {
+ MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
+ initTableData(jobConfig);
+ Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
- initTableData(jobAPI.getJobConfiguration(jobId.get()));
- Map<String, DataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
+ DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
+ Map<String, DataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(),
is(2L));
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 31a0d5ea6a4..72f4bb695d3 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -21,8 +21,6 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -31,6 +29,9 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaData;
/**
* Job configuration builder.
@@ -53,6 +54,7 @@ public final class JobConfigurationBuilder {
result.setSource(createYamlPipelineDataSourceConfiguration(new
StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("migration_standard_jdbc_source.yaml"))));
result.setTarget(createYamlPipelineDataSourceConfiguration(new
ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
+ result.setUniqueKeyColumn(new YamlPipelineColumnMetaData(1,
"order_id", 4, "", false, true, true));
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
}