This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c4b847a7a2c Refactor show migration check status DistSQL
implementation (#21441)
c4b847a7a2c is described below
commit c4b847a7a2c737a1885ead254a5c9ae229f64136
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Oct 11 11:24:45 2022 +0800
Refactor show migration check status DistSQL implementation (#21441)
* Refactor show migration check status DistSQL implementation
* Fix ci
* Fix conflict
* Fix ci
* Fix codestyle
* Fix name
---
.../ShowMigrationCheckStatusQueryResultSet.java | 24 ++++-----
.../ShowMigrationCheckStatusStatement.java | 2 +-
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 9 ++++
.../job/progress/ConsistencyCheckJobProgress.java | 10 ++++
.../PipelineJobProgressUpdatedParameter.java | 4 +-
.../api/pojo/ConsistencyCheckJobProgressInfo.java} | 24 +++++++--
.../pipeline/core/api/GovernanceRepositoryAPI.java | 4 +-
.../core/api/InventoryIncrementalJobAPI.java | 5 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 14 +++--
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 4 +-
...SingleTableInventoryDataConsistencyChecker.java | 33 ++++++++++--
.../pipeline/core/importer/DefaultImporter.java | 2 +-
.../yaml/YamlConsistencyCheckJobProgress.java | 10 ++++
.../YamlConsistencyCheckJobProgressSwapper.java | 10 ++++
.../consistencycheck/ConsistencyCheckJob.java | 3 ++
.../ConsistencyCheckJobAPIImpl.java | 61 +++++++++++++++++++---
.../ConsistencyCheckJobItemContext.java | 25 ++++++++-
.../ConsistencyCheckTasksRunner.java | 3 +-
.../migration/MigrationDataConsistencyChecker.java | 11 ++--
.../scenario/migration/MigrationJobAPIImpl.java | 6 ++-
.../migration/MigrationJobItemContext.java | 3 +-
.../data/pipeline/cases/base/BaseITCase.java | 3 +-
.../cases/migration/AbstractMigrationITCase.java | 17 +++---
.../core/api/impl/MigrationJobAPIImplTest.java | 2 +-
.../MigrationDataConsistencyCheckerTest.java | 21 +++++++-
25 files changed, 245 insertions(+), 65 deletions(-)
diff --git
a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index 5c90cfbb7b1..e893f660de8 100644
---
a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++
b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -17,22 +17,20 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
+import org.apache.commons.lang3.ObjectUtils;
import
org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
/**
* Show migration check status query result set.
@@ -46,20 +44,18 @@ public final class ShowMigrationCheckStatusQueryResultSet
implements DatabaseDis
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement
sqlStatement) {
ShowMigrationCheckStatusStatement checkMigrationStatement =
(ShowMigrationCheckStatusStatement) sqlStatement;
- Map<String, DataConsistencyCheckResult> consistencyCheckResult =
JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
- List<Collection<Object>> result = new
ArrayList<>(consistencyCheckResult.size());
- for (Entry<String, DataConsistencyCheckResult> entry :
consistencyCheckResult.entrySet()) {
- DataConsistencyCheckResult value = entry.getValue();
- DataConsistencyCountCheckResult countCheckResult =
value.getCountCheckResult();
- result.add(Arrays.asList(entry.getKey(),
countCheckResult.getSourceRecordsCount(),
countCheckResult.getTargetRecordsCount(),
String.valueOf(countCheckResult.isMatched()),
-
String.valueOf(value.getContentCheckResult().isMatched())));
- }
+ ConsistencyCheckJobProgressInfo progressInfo =
JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
+ List<Collection<Object>> result = new LinkedList<>();
+ String checkResult = null == progressInfo.getResult() ? "" :
progressInfo.getResult().toString();
+ result.add(Arrays.asList(progressInfo.getTableName(), checkResult,
String.valueOf(progressInfo.getFinishedPercentage()),
+ ObjectUtils.defaultIfNull(progressInfo.getRemainingSeconds(),
""), progressInfo.getCheckBeginTime(),
ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
+ ObjectUtils.defaultIfNull(progressInfo.getDurationSeconds(),
""), progressInfo.getErrorMessage()));
data = result.iterator();
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("table_name", "source_records_count",
"target_records_count", "records_count_matched", "records_content_matched");
+ return Arrays.asList("table_name", "result", "finished_percentage",
"remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds",
"error_message");
}
@Override
diff --git
a/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
b/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
index fe8a36c0e9d..e9b2e6932d5 100644
---
a/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
+++
b/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
@@ -22,7 +22,7 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
/**
- * Show check migration status statement.
+ * Show migration check status statement.
*/
@RequiredArgsConstructor
@Getter
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 008f0002705..2e59d9e3976 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -59,4 +60,12 @@ public interface ConsistencyCheckJobPublicAPI extends
PipelineJobPublicAPI, Requ
* @param parentJobId parent job id
*/
void stopByParentJobId(String parentJobId);
+
+ /**
+ * Get consistency job progress info.
+ *
+ * @param parentJobId parent job id
+ * @return consistency job progress info
+ */
+ ConsistencyCheckJobProgressInfo getJobProgressInfo(String parentJobId);
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index 22a16efa840..e3297179140 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -31,4 +31,14 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
public final class ConsistencyCheckJobProgress implements
PipelineJobItemProgress {
private JobStatus status = JobStatus.RUNNING;
+
+ private String tableNames;
+
+ private Long checkedRecordsCount;
+
+ private Long recordsCount;
+
+ private Long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
index 1a21a14bb7e..ab195bc42e4 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
@@ -27,7 +27,5 @@ import lombok.RequiredArgsConstructor;
@Getter
public final class PipelineJobProgressUpdatedParameter {
- private final int insertedRecordsCount;
-
- private final int deletedRecordsCount;
+ private final int processedRecordsCount;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
similarity index 65%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
index f0baaf647c8..59604a4e991 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
@@ -15,16 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
import lombok.Data;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Yaml data consistency check job progress.
+ * Consistency check jon progress info.
*/
+
@Data
-public final class YamlConsistencyCheckJobProgress implements
YamlConfiguration {
+public final class ConsistencyCheckJobProgressInfo {
+
+ private String tableName;
+
+ private Boolean result;
+
+ private int finishedPercentage;
+
+ private Long remainingSeconds;
+
+ private String checkBeginTime;
+
+ private String checkEndTime;
+
+ private Long durationSeconds;
- private String status;
+ private String errorMessage;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index e7dc53f1050..17874b2e8bb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -76,11 +76,11 @@ public interface GovernanceRepositoryAPI {
/**
* Get check job result.
*
- * @param jobId job id
+ * @param parentJobId job id
* @param checkJobId check job id
* @return check job result
*/
- Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId,
String checkJobId);
+ Map<String, DataConsistencyCheckResult> getCheckJobResult(String
parentJobId, String checkJobId);
/**
* Persist check job result.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 077d08c7281..668724a5d56 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.util.Map;
@@ -56,9 +57,11 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*
* @param pipelineJobConfig job configuration
* @param calculateAlgorithm calculate algorithm
+ * @param checkJobItemContext consistency check job progress listener
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm);
+ Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm,
+
ConsistencyCheckJobItemContext checkJobItemContext);
/**
* Aggregate data consistency check results.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 2881877cbe6..19be5acdd2c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProce
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
@@ -160,7 +161,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
log.info("Data consistency check for job {}", jobId);
PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
DataConsistencyCalculateAlgorithm calculateAlgorithm =
buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
- return dataConsistencyCheck(jobConfig, calculateAlgorithm);
+ return dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
}
@Override
@@ -168,18 +169,21 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig,
buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType,
algorithmProps));
+ return dataConsistencyCheck(jobConfig,
buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType,
algorithmProps), null);
}
@Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculateAlgorithm) {
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculateAlgorithm,
+ final
ConsistencyCheckJobItemContext checkJobItemContext) {
String jobId = jobConfig.getJobId();
- Map<String, DataConsistencyCheckResult> result =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
+ PipelineDataConsistencyChecker dataConsistencyChecker =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig), checkJobItemContext);
+ Map<String, DataConsistencyCheckResult> result =
dataConsistencyChecker.check(calculateAlgorithm);
log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, calculateAlgorithm.getType(), result);
return result;
}
- protected abstract PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext);
+ protected abstract PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext,
+
ConsistencyCheckJobItemContext checkJobItemContext);
@Override
public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, DataConsistencyCheckResult> checkResults) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 86ce2c90139..64a876cc8f9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -77,9 +77,9 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@SuppressWarnings("unchecked")
@Override
- public Map<String, DataConsistencyCheckResult> getCheckJobResult(final
String jobId, final String checkJobId) {
+ public Map<String, DataConsistencyCheckResult> getCheckJobResult(final
String parentJobId, final String checkJobId) {
Map<String, DataConsistencyCheckResult> result = new HashMap<>();
- String yamlCheckResultMapText =
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(jobId,
checkJobId));
+ String yamlCheckResultMapText =
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId));
if (StringUtils.isBlank(yamlCheckResultMapText)) {
return Collections.emptyMap();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 3125fc778fc..3547a2a1f48 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -26,12 +28,16 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -40,7 +46,9 @@ import
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -72,6 +80,8 @@ public final class SingleTableInventoryDataConsistencyChecker
{
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+ private final ConsistencyCheckJobItemContext
consistencyCheckJobItemContext;
+
/**
* Data consistency check.
*
@@ -82,26 +92,35 @@ public final class
SingleTableInventoryDataConsistencyChecker {
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) +
"-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
- return check(calculateAlgorithm, executor);
+ return check(calculateAlgorithm, executor,
consistencyCheckJobItemContext);
} finally {
executor.shutdown();
executor.shutdownNow();
}
}
- private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor) {
+ private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor,
+ final
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
String sourceDatabaseType =
sourceDataSource.getDatabaseType().getType();
String targetDatabaseType =
targetDataSource.getDatabaseType().getType();
String sourceTableName = sourceTable.getTableName().getOriginal();
- PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(),
sourceTableName);
+ String schemaName = sourceTable.getSchemaName().getOriginal();
+ PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaName, sourceTableName);
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
DataConsistencyCalculateParameter sourceParameter = buildParameter(
- sourceDataSource, sourceTable.getSchemaName().getOriginal(),
sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey);
+ sourceDataSource, schemaName, sourceTableName, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
DataConsistencyCalculateParameter targetParameter = buildParameter(
targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType,
sourceDatabaseType, uniqueKey);
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParameter).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParameter).iterator();
+ if (null != consistencyCheckJobItemContext) {
+
consistencyCheckJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
+ InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI
=
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+ long recordsCount =
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+ consistencyCheckJobItemContext.setRecordsCount(recordsCount);
+ }
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
@@ -120,6 +139,12 @@ public final class
SingleTableInventoryDataConsistencyChecker {
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
+ if (null != consistencyCheckJobItemContext) {
+ consistencyCheckJobItemContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ }
+ }
+ if (null != consistencyCheckJobItemContext) {
+
consistencyCheckJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
}
return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new
DataConsistencyContentCheckResult(contentMatched));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 4b87e973cf9..699fc6d1878 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -125,7 +125,7 @@ public final class DefaultImporter extends
AbstractLifecycleExecutor implements
flushInternal(dataSource, each.getInsertDataRecords());
flushInternal(dataSource, each.getUpdateDataRecords());
}
- return new PipelineJobProgressUpdatedParameter(insertRecordNumber,
deleteRecordNumber);
+ return new PipelineJobProgressUpdatedParameter(insertRecordNumber -
deleteRecordNumber);
}
private void flushInternal(final DataSource dataSource, final
List<DataRecord> buffer) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index f0baaf647c8..d915fce6a1a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -27,4 +27,14 @@ import
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
public final class YamlConsistencyCheckJobProgress implements
YamlConfiguration {
private String status;
+
+ private String tableNames;
+
+ private Long checkedRecordsCount;
+
+ private Long recordsCount;
+
+ private Long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
index 8796af3c6d8..a3f5dcf20b1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
@@ -30,6 +30,11 @@ public final class YamlConsistencyCheckJobProgressSwapper
implements YamlConfigu
public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final
ConsistencyCheckJobProgress data) {
YamlConsistencyCheckJobProgress result = new
YamlConsistencyCheckJobProgress();
result.setStatus(data.getStatus().name());
+ result.setRecordsCount(data.getRecordsCount());
+ result.setCheckedRecordsCount(data.getCheckedRecordsCount());
+ result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
+ result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
+ result.setTableNames(data.getTableNames());
return result;
}
@@ -37,6 +42,11 @@ public final class YamlConsistencyCheckJobProgressSwapper
implements YamlConfigu
public ConsistencyCheckJobProgress swapToObject(final
YamlConsistencyCheckJobProgress yamlConfig) {
ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
+ result.setRecordsCount(yamlConfig.getRecordsCount());
+ result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
+ result.setCheckBeginTimeMillis(yamlConfig.getCheckBeginTimeMillis());
+ result.setCheckEndTimeMillis(yamlConfig.getCheckEndTimeMillis());
+ result.setTableNames(yamlConfig.getTableNames());
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index b86955e252d..cce3ea67e4c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
@@ -55,6 +56,7 @@ public final class ConsistencyCheckJob extends
AbstractPipelineJob implements Si
}
ConsistencyCheckTasksRunner tasksRunner = new
ConsistencyCheckTasksRunner(jobItemContext);
tasksRunner.start();
+
PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId,
shardingContext.getShardingItem());
getTasksRunnerMap().put(shardingItem, tasksRunner);
}
@@ -74,5 +76,6 @@ public final class ConsistencyCheckJob extends
AbstractPipelineJob implements Si
getTasksRunnerMap().clear();
String jobBarrierDisablePath =
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath,
0);
+
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index b91daee9f9e..c46efa359f1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -23,8 +23,6 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
@@ -34,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -44,12 +43,19 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -103,8 +109,14 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+ ConsistencyCheckJobItemContext checkJobItemContext =
(ConsistencyCheckJobItemContext) jobItemContext;
ConsistencyCheckJobProgress jobProgress = new
ConsistencyCheckJobProgress();
jobProgress.setStatus(jobItemContext.getStatus());
+
jobProgress.setCheckedRecordsCount(checkJobItemContext.getCheckedRecordsCount().get());
+ jobProgress.setRecordsCount(checkJobItemContext.getRecordsCount());
+
jobProgress.setCheckBeginTimeMillis(checkJobItemContext.getCheckBeginTimeMillis());
+
jobProgress.setCheckEndTimeMillis(checkJobItemContext.getCheckEndTimeMillis());
+ jobProgress.setTableNames(null == checkJobItemContext.getTableNames()
? null : String.join(",", checkJobItemContext.getTableNames()));
YamlConsistencyCheckJobProgress yamlJobProgress =
swapper.swapToYamlConfiguration(jobProgress);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
}
@@ -115,10 +127,7 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
if (StringUtils.isBlank(progress)) {
return null;
}
- ConsistencyCheckJobProgress jobProgress =
swapper.swapToObject(YamlEngine.unmarshal(progress,
YamlConsistencyCheckJobProgress.class, true));
- ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
- result.setStatus(jobProgress.getStatus());
- return result;
+ return swapper.swapToObject(YamlEngine.unmarshal(progress,
YamlConsistencyCheckJobProgress.class, true));
}
@Override
@@ -156,6 +165,46 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
stop(checkLatestJobId.get());
}
+ @Override
+ public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String
parentJobId) {
+ Optional<String> checkLatestJobId =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(),
() -> new PipelineJobNotFoundException(parentJobId));
+ String checkJobId = checkLatestJobId.get();
+ ConsistencyCheckJobProgress jobItemProgress =
getJobItemProgress(checkJobId, 0);
+ ConsistencyCheckJobProgressInfo result = new
ConsistencyCheckJobProgressInfo();
+ if (null == jobItemProgress) {
+ return result;
+ }
+ int finishedPercentage;
+ LocalDateTime checkBeginTime = new
Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
+ if (null != jobItemProgress.getRecordsCount() &&
Objects.equals(jobItemProgress.getCheckedRecordsCount(),
jobItemProgress.getRecordsCount())) {
+ finishedPercentage = 100;
+ LocalDateTime checkEndTime = new
Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
+ Duration duration = Duration.between(checkBeginTime, checkEndTime);
+ result.setDurationSeconds(duration.toMillis() / 1000);
+ result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
+ result.setRemainingSeconds(0L);
+ } else {
+ if (null == jobItemProgress.getRecordsCount()) {
+ finishedPercentage = 0;
+ } else {
+ finishedPercentage = Math.min(100,
BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() *
100, jobItemProgress.getRecordsCount())).intValue());
+ Duration duration = Duration.between(checkBeginTime,
LocalDateTime.now());
+ long remainMills = jobItemProgress.getRecordsCount() * 100 /
jobItemProgress.getCheckedRecordsCount() * duration.toMillis();
+ result.setRemainingSeconds(remainMills / 1000);
+ }
+ }
+ result.setFinishedPercentage(finishedPercentage);
+ String tableName = null == jobItemProgress.getTableNames() ? null :
jobItemProgress.getTableNames().split(",")[0];
+ result.setTableName(Optional.ofNullable(tableName).orElse(""));
+ result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
+ result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
+ Map<String, DataConsistencyCheckResult> checkJobResult =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId,
checkJobId);
+ Optional<DataConsistencyCheckResult> dataConsistencyCheckResult =
Optional.ofNullable(checkJobResult.get(tableName));
+ dataConsistencyCheckResult.ifPresent(optional ->
result.setResult(optional.getContentCheckResult().isMatched()));
+ return result;
+ }
+
@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String
jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index fb5841d9fde..7b812529dd6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -24,6 +24,12 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJo
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Consistency check job item context.
@@ -31,7 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@Getter
@Setter
@Slf4j
-public final class ConsistencyCheckJobItemContext implements
PipelineJobItemContext {
+public final class ConsistencyCheckJobItemContext implements
PipelineJobItemContext, PipelineJobProgressListener {
private final String jobId;
@@ -43,6 +49,16 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
private volatile JobStatus status;
+ private Collection<String> tableNames;
+
+ private volatile Long recordsCount;
+
+ private final AtomicLong checkedRecordsCount = new AtomicLong(0);
+
+ private final long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
+
private final ConsistencyCheckJobConfiguration jobConfig;
public ConsistencyCheckJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final
JobStatus status) {
@@ -50,10 +66,17 @@ public final class ConsistencyCheckJobItemContext
implements PipelineJobItemCont
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
this.status = status;
+ checkBeginTimeMillis = System.currentTimeMillis();
}
@Override
public PipelineProcessContext getJobProcessContext() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
parameter) {
+ checkedRecordsCount.addAndGet(parameter.getProcessedRecordsCount());
+ PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index 69055b5a85d..a13093312a4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -99,8 +99,9 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(
parentJobConfig, checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
this.calculateAlgorithm = calculateAlgorithm;
- Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult
= jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult
= jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm,
jobItemContext);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
+ jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 50ab3a39afb..91fa69b6fdc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -57,11 +58,15 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext) {
+ private final ConsistencyCheckJobItemContext checkJobItemContext;
+
+ public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext,
+ final
ConsistencyCheckJobItemContext checkJobItemContext) {
this.jobConfig = jobConfig;
readRateLimitAlgorithm = null != processContext ?
processContext.getReadRateLimitAlgorithm() : null;
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new
HashSet<>(Arrays.asList(jobConfig.getSourceTableName(),
jobConfig.getTargetTableName()))));
+ this.checkJobItemContext = checkJobItemContext;
}
@Override
@@ -77,8 +82,8 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
- SingleTableInventoryDataConsistencyChecker
singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
- jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader,
readRateLimitAlgorithm);
+ SingleTableInventoryDataConsistencyChecker
singleTableInventoryChecker = new
SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(),
sourceDataSource, targetDataSource,
+ sourceTable, targetTable, jobConfig.getUniqueKeyColumn(),
metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
result.put(sourceTable.getTableName().getOriginal(),
singleTableInventoryChecker.check(calculateAlgorithm));
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index d73e6c07c47..72fe7f49559 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -63,6 +63,7 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -241,8 +242,9 @@ public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAP
}
@Override
- protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext);
+ protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+
final ConsistencyCheckJobItemContext checkJobItemContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext, checkJobItemContext);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index da3eb7b8203..35651826bd2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -132,8 +132,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
parameter) {
- int needAddNumber = parameter.getInsertedRecordsCount() -
parameter.getDeletedRecordsCount();
- processedRecordsCount.addAndGet(needAddNumber);
+ processedRecordsCount.addAndGet(parameter.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 644917216ba..f269624890e 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -22,6 +22,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -238,7 +239,7 @@ public abstract class BaseITCase {
ResultSet resultSet =
connection.createStatement().executeQuery(sql);
List<Map<String, Object>> result = resultSetToList(resultSet);
log.info("proxy query for list, sql: {}, result: {}", sql,
result);
- return result;
+ return ObjectUtils.defaultIfNull(result,
Collections.emptyList());
} catch (final SQLException ex) {
log.error("data access error", ex);
}
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 5032f649681..a04d7224de4 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.integration.data.pipeline.cases.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import
org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
import
org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
@@ -164,19 +165,19 @@ public abstract class AbstractMigrationITCase extends
BaseITCase {
protected void assertCheckMigrationSuccess(final String jobId, final
String algorithmType) throws SQLException {
proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE
(NAME='%s')", jobId, algorithmType), 0);
- List<Map<String, Object>> checkJobResults = Collections.emptyList();
+ List<Map<String, Object>> resultList = Collections.emptyList();
for (int i = 0; i < 10; i++) {
- checkJobResults = queryForListWithLog(String.format("SHOW
MIGRATION CHECK STATUS '%s'", jobId));
- if (null != checkJobResults && !checkJobResults.isEmpty()) {
+ resultList = queryForListWithLog(String.format("SHOW MIGRATION
CHECK STATUS '%s'", jobId));
+ List<String> checkEndTimeList = resultList.stream().map(map ->
map.get("check_end_time").toString()).filter(StringUtils::isNotBlank).collect(Collectors.toList());
+ if (checkEndTimeList.size() == resultList.size()) {
break;
}
ThreadUtil.sleep(5, TimeUnit.SECONDS);
}
- assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
- log.info("check job results: {}", checkJobResults);
- for (Map<String, Object> entry : checkJobResults) {
-
assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
-
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+ log.info("check job results: {}", resultList);
+ for (Map<String, Object> entry : resultList) {
+ assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
+ assertThat(entry.get("finished_percentage").toString(), is("100"));
}
}
}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index f8dd4794b7b..9cd96221ed8 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -29,8 +29,8 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import
org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index fa646106a05..1b008f63e67 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -18,12 +18,19 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -31,6 +38,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -46,13 +54,22 @@ public final class MigrationDataConsistencyCheckerTest {
@Test
public void assertCountAndDataCheck() throws SQLException {
MigrationJobConfiguration jobConfig = createJobConfiguration();
- Map<String, DataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null))
- .check(new DataConsistencyCalculateAlgorithmFixture());
+ JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
+ jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobConfig.getJobId(),
0, "");
+ Map<String, DataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
+ createConsistencyCheckJobItemConfig()).check(new
DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(),
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
}
+ private ConsistencyCheckJobItemContext
createConsistencyCheckJobItemConfig() {
+ ConsistencyCheckJobConfiguration jobConfig = new
ConsistencyCheckJobConfiguration("", "", "", new Properties());
+ return new ConsistencyCheckJobItemContext(jobConfig, 0,
JobStatus.RUNNING);
+ }
+
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {
MigrationJobItemContext jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());