sandynz commented on code in PR #21441:
URL: https://github.com/apache/shardingsphere/pull/21441#discussion_r991202520
##########
features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java:
##########
@@ -46,20 +44,18 @@ public final class ShowMigrationCheckStatusQueryResultSet
implements DatabaseDis
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement
sqlStatement) {
ShowMigrationCheckStatusStatement checkMigrationStatement =
(ShowMigrationCheckStatusStatement) sqlStatement;
- Map<String, DataConsistencyCheckResult> consistencyCheckResult =
JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
- List<Collection<Object>> result = new
ArrayList<>(consistencyCheckResult.size());
- for (Entry<String, DataConsistencyCheckResult> entry :
consistencyCheckResult.entrySet()) {
- DataConsistencyCheckResult value = entry.getValue();
- DataConsistencyCountCheckResult countCheckResult =
value.getCountCheckResult();
- result.add(Arrays.asList(entry.getKey(),
countCheckResult.getSourceRecordsCount(),
countCheckResult.getTargetRecordsCount(),
String.valueOf(countCheckResult.isMatched()),
-
String.valueOf(value.getContentCheckResult().isMatched())));
- }
+ ConsistencyCheckJobProgressInfo progressInfo =
JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
+ List<Collection<Object>> result = new LinkedList<>();
+ String checkResult = null == progressInfo.getCheckResult() ? "" :
progressInfo.getCheckResult().toString();
+ result.add(Arrays.asList(progressInfo.getTableName(), checkResult,
String.valueOf(progressInfo.getInventoryFinishedPercentage()),
+ ObjectUtils.defaultIfNull(progressInfo.getRemainingTime(),
""), progressInfo.getCheckBeginTime(),
ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
+ ObjectUtils.defaultIfNull(progressInfo.getCheckDuration(),
""), progressInfo.getErrorMessage()));
data = result.iterator();
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("table_name", "source_records_count",
"target_records_count", "records_count_matched", "records_content_matched");
+ return Arrays.asList("table_name", "check_result",
"inventory_finished_percentage", "remaining_time", "check_begin_time",
"check_end_time", "check_duration", "error_message");
Review Comment:
Columns name could be improved:
- Remove `check_` prefix
- `inventory_finished_percentage` could be `finished_percentage`
- `remaining_time` could be `remaining_seconds`
- `check_duration` could be `duration_seconds`
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -76,32 +84,41 @@ public final class
SingleTableInventoryDataConsistencyChecker {
* Data consistency check.
*
* @param calculateAlgorithm calculate algorithm
+ * @param consistencyCheckJobItemContext job progress listener
* @return data consistency check result
*/
- public DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ public DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) +
"-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
- return check(calculateAlgorithm, executor);
+ return check(calculateAlgorithm, executor,
consistencyCheckJobItemContext);
} finally {
executor.shutdown();
executor.shutdownNow();
}
}
- private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor) {
+ private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor,
+ final
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
String sourceDatabaseType =
sourceDataSource.getDatabaseType().getType();
String targetDatabaseType =
targetDataSource.getDatabaseType().getType();
String sourceTableName = sourceTable.getTableName().getOriginal();
- PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(),
sourceTableName);
+ consistencyCheckJobItemContext.setTableName(sourceTableName);
+ String schemeName = sourceTable.getSchemaName().getOriginal();
+ PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemeName, sourceTableName);
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
DataConsistencyCalculateParameter sourceParameter = buildParameter(
- sourceDataSource, sourceTable.getSchemaName().getOriginal(),
sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey);
+ sourceDataSource, schemeName, sourceTableName, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
DataConsistencyCalculateParameter targetParameter = buildParameter(
targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType,
sourceDatabaseType, uniqueKey);
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParameter).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParameter).iterator();
+ executor.submit(() -> {
+ // TODO use select count may take too long and cause a timeout
+ long sourceRecordsCount = count(sourceDataSource, schemeName,
sourceTableName, sourceDataSource.getDatabaseType());
+ consistencyCheckJobItemContext.setRecordsCount(sourceRecordsCount);
+ });
Review Comment:
1, `executor` has only `2` max threads and `2` capacity blocking queue, so
the 3rd submit might be blocked and might be timed out (60 seconds).
2, Could we reuse records count in
`InventoryIncrementalJobItemContext.getProcessedRecordsCount()`? Thought it's
not accurate totally, specially when user do consistency check before inventory
dump is done.
3, If option 2 could not be done, then we could calculate records count
blocking for now.
##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java:
##########
@@ -97,17 +98,19 @@ public interface InventoryIncrementalJobPublicAPI extends
PipelineJobPublicAPI,
* Do data consistency check.
*
* @param jobId job id
+ * @param jobProgressListener job progress listener
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
PipelineJobProgressListener jobProgressListener);
/**
* Do data consistency check.
*
* @param jobId job id
* @param algorithmType algorithm type
* @param algorithmProps algorithm props. Nullable
+ * @param jobProgressListener consistency check job progress listener
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps, PipelineJobProgressListener
jobProgressListener);
Review Comment:
Could we remove the 2nd parameter `PipelineJobProgressListener
jobProgressListener`? Since it's not necessary
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -76,32 +84,41 @@ public final class
SingleTableInventoryDataConsistencyChecker {
* Data consistency check.
*
* @param calculateAlgorithm calculate algorithm
+ * @param consistencyCheckJobItemContext job progress listener
* @return data consistency check result
*/
- public DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ public DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
Review Comment:
Parameter `ConsistencyCheckJobItemContext consistencyCheckJobItemContext`
could be used as class field, but not method parameter
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -241,8 +243,9 @@ public MigrationProcessContext
buildPipelineProcessContext(final PipelineJobConf
}
@Override
- protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext);
+ protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+
final PipelineJobProgressListener jobProgressListener) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext, (ConsistencyCheckJobItemContext)
jobProgressListener);
Review Comment:
The class cast is strange, it's better to remove it
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -156,6 +165,45 @@ public void stopByParentJobId(final String parentJobId) {
stop(checkLatestJobId.get());
}
+ @Override
+ public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String
parentJobId) {
+ Optional<String> checkLatestJobId =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(),
() -> new PipelineJobNotFoundException(parentJobId));
+ String checkJobId = checkLatestJobId.get();
+ ConsistencyCheckJobProgress jobItemProgress =
getJobItemProgress(checkJobId, 0);
+ ConsistencyCheckJobProgressInfo result = new
ConsistencyCheckJobProgressInfo();
+ if (null == jobItemProgress) {
+ return result;
+ }
+ int inventoryFinishedPercentage;
+ LocalDateTime checkBeginTime = new
Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
+ if (null != jobItemProgress.getRecordsCount() &&
Objects.equals(jobItemProgress.getCheckedRecordsCount(),
jobItemProgress.getRecordsCount())) {
+ inventoryFinishedPercentage = 100;
+ LocalDateTime checkEndTime = new
Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
+ Duration duration = Duration.between(checkBeginTime, checkEndTime);
+ result.setCheckDuration(duration.toMillis() / 1000);
+ result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
+ result.setRemainingTime(0L);
+ } else {
+ if (null == jobItemProgress.getRecordsCount()) {
+ inventoryFinishedPercentage = 0;
+ } else {
+ inventoryFinishedPercentage =
BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() *
100, jobItemProgress.getRecordsCount())).intValue();
Review Comment:
`getCheckedRecordsCount` might be greater than `getRecordsCount`, the
percentage could not greater than 100
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -43,17 +48,34 @@ public final class ConsistencyCheckJobItemContext
implements PipelineJobItemCont
private volatile JobStatus status;
+ private String tableName;
+
+ private volatile Long recordsCount;
+
+ private final AtomicLong checkedRecordsCount = new AtomicLong(0);
+
+ private final long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
+
private final ConsistencyCheckJobConfiguration jobConfig;
public ConsistencyCheckJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final
JobStatus status) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
this.status = status;
+ this.checkBeginTimeMillis = System.currentTimeMillis();
Review Comment:
`this.` prefix is not necessary
##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+
+/**
+ * Consistency check jon progress info.
+ */
+
+@Data
+public final class ConsistencyCheckJobProgressInfo {
+
+ private String tableName;
+
+ private Boolean checkResult;
+
+ private int inventoryFinishedPercentage;
+
+ private Long remainingTime;
+
+ private String checkBeginTime;
+
+ private String checkEndTime;
+
+ private Long checkDuration;
Review Comment:
These fields name could be updated, keep consistency as DistSQL response
column names.
And also related yaml class.
And also `ConsistencyCheckJobProgress`.
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java:
##########
@@ -99,8 +100,10 @@ protected void runBlocking() {
DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(
parentJobConfig, checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
this.calculateAlgorithm = calculateAlgorithm;
- Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult
= jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+
PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId,
jobItemContext.getShardingItem());
Review Comment:
`addJobProgressPersistContext` exists in `ConsistencyCheckJob`, looks it's
duplicated
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java:
##########
@@ -125,6 +128,7 @@ public void onSuccess() {
jobItemContext.setStatus(JobStatus.FINISHED);
checkJobAPI.persistJobItemProgress(jobItemContext);
checkJobAPI.stop(checkJobId);
+
}
Review Comment:
Blank line in method should be removed
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java:
##########
@@ -56,9 +57,11 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*
* @param pipelineJobConfig job configuration
* @param calculateAlgorithm calculate algorithm
+ * @param checkJobProgressListener consistency check job progress listener
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm);
+ Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm,
+
PipelineJobProgressListener checkJobProgressListener);
Review Comment:
`PipelineJobProgressListener jobProgressListener` in this interface is
confusing, since there's already PipelineJobProgressListener for inventory job.
Could we just use `ConsistencyCheckJobItemContext`?
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java:
##########
@@ -57,11 +58,15 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext) {
+ private final ConsistencyCheckJobItemContext
consistencyCheckJobItemContext;
Review Comment:
`consistencyCheckJobItemContext` could be shorter, e.g. `checkJobItemContext`
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -43,17 +48,34 @@ public final class ConsistencyCheckJobItemContext
implements PipelineJobItemCont
private volatile JobStatus status;
+ private String tableName;
Review Comment:
It's better to use `Collection<String> tableNames` to replace `String
tableName`, since there might be several tables
##########
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java:
##########
@@ -167,16 +168,16 @@ protected void assertCheckMigrationSuccess(final String
jobId, final String algo
List<Map<String, Object>> checkJobResults = Collections.emptyList();
for (int i = 0; i < 10; i++) {
checkJobResults = queryForListWithLog(String.format("SHOW
MIGRATION CHECK STATUS '%s'", jobId));
- if (null != checkJobResults && !checkJobResults.isEmpty()) {
+ List<String> checkEndTimeList = checkJobResults.stream().map(map
->
map.get("check_end_time").toString()).filter(StringUtils::isNotBlank).collect(Collectors.toList());
+ if (checkEndTimeList.size() == checkJobResults.size()) {
break;
}
ThreadUtil.sleep(5, TimeUnit.SECONDS);
}
- assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
log.info("check job results: {}", checkJobResults);
for (Map<String, Object> entry : checkJobResults) {
-
assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
-
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+
assertTrue(Boolean.parseBoolean(entry.get("check_result").toString()));
+ assertThat(entry.get("inventory_finished_percentage").toString(),
is("100"));
Review Comment:
These DistSQL response cloumn name could be updated, keep consistency
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -156,6 +165,45 @@ public void stopByParentJobId(final String parentJobId) {
stop(checkLatestJobId.get());
}
+ @Override
+ public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String
parentJobId) {
+ Optional<String> checkLatestJobId =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(),
() -> new PipelineJobNotFoundException(parentJobId));
+ String checkJobId = checkLatestJobId.get();
+ ConsistencyCheckJobProgress jobItemProgress =
getJobItemProgress(checkJobId, 0);
+ ConsistencyCheckJobProgressInfo result = new
ConsistencyCheckJobProgressInfo();
+ if (null == jobItemProgress) {
+ return result;
+ }
+ int inventoryFinishedPercentage;
Review Comment:
Variable name could be improved, keep consistency with DistSQL response
column name.
e.g. inventoryFinishedPercentage. And also others.
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java:
##########
@@ -27,4 +27,14 @@
public final class YamlConsistencyCheckJobProgress implements
YamlConfiguration {
private String status;
+
+ private String tableName;
+
Review Comment:
It's better to use `tableNames`, since there might be several tables
##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+
+/**
+ * Consistency check jon progress info.
+ */
+
+@Data
+public final class ConsistencyCheckJobProgressInfo {
+
+ private String tableName;
+
Review Comment:
It's better to use `tableNames`, since there might be several tables
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]