sandynz commented on code in PR #21441:
URL: https://github.com/apache/shardingsphere/pull/21441#discussion_r991202520


##########
features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java:
##########
@@ -46,20 +44,18 @@ public final class ShowMigrationCheckStatusQueryResultSet 
implements DatabaseDis
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement 
sqlStatement) {
         ShowMigrationCheckStatusStatement checkMigrationStatement = 
(ShowMigrationCheckStatusStatement) sqlStatement;
-        Map<String, DataConsistencyCheckResult> consistencyCheckResult = 
JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
-        List<Collection<Object>> result = new 
ArrayList<>(consistencyCheckResult.size());
-        for (Entry<String, DataConsistencyCheckResult> entry : 
consistencyCheckResult.entrySet()) {
-            DataConsistencyCheckResult value = entry.getValue();
-            DataConsistencyCountCheckResult countCheckResult = 
value.getCountCheckResult();
-            result.add(Arrays.asList(entry.getKey(), 
countCheckResult.getSourceRecordsCount(), 
countCheckResult.getTargetRecordsCount(), 
String.valueOf(countCheckResult.isMatched()),
-                    
String.valueOf(value.getContentCheckResult().isMatched())));
-        }
+        ConsistencyCheckJobProgressInfo progressInfo = 
JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
+        List<Collection<Object>> result = new LinkedList<>();
+        String checkResult = null == progressInfo.getCheckResult() ? "" : 
progressInfo.getCheckResult().toString();
+        result.add(Arrays.asList(progressInfo.getTableName(), checkResult, 
String.valueOf(progressInfo.getInventoryFinishedPercentage()),
+                ObjectUtils.defaultIfNull(progressInfo.getRemainingTime(), 
""), progressInfo.getCheckBeginTime(), 
ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
+                ObjectUtils.defaultIfNull(progressInfo.getCheckDuration(), 
""), progressInfo.getErrorMessage()));
         data = result.iterator();
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("table_name", "source_records_count", 
"target_records_count", "records_count_matched", "records_content_matched");
+        return Arrays.asList("table_name", "check_result", 
"inventory_finished_percentage", "remaining_time", "check_begin_time", 
"check_end_time", "check_duration", "error_message");

Review Comment:
   Columns name could be improved:
   - Remove `check_` prefix
   - `inventory_finished_percentage` could be `finished_percentage`
   - `remaining_time` could be `remaining_seconds`
   - `check_duration` could be `duration_seconds`
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -76,32 +84,41 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
      * Data consistency check.
      *
      * @param calculateAlgorithm calculate algorithm
+     * @param consistencyCheckJobItemContext job progress listener
      * @return data consistency check result
      */
-    public DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    public DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final 
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
         ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + 
"-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
-            return check(calculateAlgorithm, executor);
+            return check(calculateAlgorithm, executor, 
consistencyCheckJobItemContext);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
         }
     }
     
-    private DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor 
executor) {
+    private DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor 
executor,
+                                             final 
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
         String sourceDatabaseType = 
sourceDataSource.getDatabaseType().getType();
         String targetDatabaseType = 
targetDataSource.getDatabaseType().getType();
         String sourceTableName = sourceTable.getTableName().getOriginal();
-        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), 
sourceTableName);
+        consistencyCheckJobItemContext.setTableName(sourceTableName);
+        String schemeName = sourceTable.getSchemaName().getOriginal();
+        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemeName, sourceTableName);
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
         DataConsistencyCalculateParameter sourceParameter = buildParameter(
-                sourceDataSource, sourceTable.getSchemaName().getOriginal(), 
sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey);
+                sourceDataSource, schemeName, sourceTableName, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
         DataConsistencyCalculateParameter targetParameter = buildParameter(
                 targetDataSource, targetTable.getSchemaName().getOriginal(), 
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, 
sourceDatabaseType, uniqueKey);
         Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = 
calculateAlgorithm.calculate(sourceParameter).iterator();
         Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = 
calculateAlgorithm.calculate(targetParameter).iterator();
+        executor.submit(() -> {
+            // TODO use select count may take too long and cause a timeout
+            long sourceRecordsCount = count(sourceDataSource, schemeName, 
sourceTableName, sourceDataSource.getDatabaseType());
+            consistencyCheckJobItemContext.setRecordsCount(sourceRecordsCount);
+        });

Review Comment:
   1, `executor` has only `2` max threads and `2` capacity blocking queue, so 
the 3rd submit might be blocked and might be timed out (60 seconds).
   
   2, Could we reuse records count in 
`InventoryIncrementalJobItemContext.getProcessedRecordsCount()`? Thought it's 
not accurate totally, specially when user do consistency check before inventory 
dump is done.
   
   3, If option 2 could not be done, then we could calculate records count 
blocking for now.
   



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java:
##########
@@ -97,17 +98,19 @@ public interface InventoryIncrementalJobPublicAPI extends 
PipelineJobPublicAPI,
      * Do data consistency check.
      *
      * @param jobId job id
+     * @param jobProgressListener job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, 
PipelineJobProgressListener jobProgressListener);
     
     /**
      * Do data consistency check.
      *
      * @param jobId job id
      * @param algorithmType algorithm type
      * @param algorithmProps algorithm props. Nullable
+     * @param jobProgressListener consistency check job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, 
String algorithmType, Properties algorithmProps);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, 
String algorithmType, Properties algorithmProps, PipelineJobProgressListener 
jobProgressListener);

Review Comment:
   Could we remove the 2nd parameter `PipelineJobProgressListener 
jobProgressListener`? Since it's not necessary



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -76,32 +84,41 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
      * Data consistency check.
      *
      * @param calculateAlgorithm calculate algorithm
+     * @param consistencyCheckJobItemContext job progress listener
      * @return data consistency check result
      */
-    public DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    public DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final 
ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {

Review Comment:
   Parameter `ConsistencyCheckJobItemContext consistencyCheckJobItemContext` 
could be used as class field, but not method parameter



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -241,8 +243,9 @@ public MigrationProcessContext 
buildPipelineProcessContext(final PipelineJobConf
     }
     
     @Override
-    protected PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
pipelineJobConfig, processContext);
+    protected PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+                                                                               
  final PipelineJobProgressListener jobProgressListener) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
pipelineJobConfig, processContext, (ConsistencyCheckJobItemContext) 
jobProgressListener);

Review Comment:
   The class cast is strange, it's better to remove it



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -156,6 +165,45 @@ public void stopByParentJobId(final String parentJobId) {
         stop(checkLatestJobId.get());
     }
     
+    @Override
+    public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String 
parentJobId) {
+        Optional<String> checkLatestJobId = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), 
() -> new PipelineJobNotFoundException(parentJobId));
+        String checkJobId = checkLatestJobId.get();
+        ConsistencyCheckJobProgress jobItemProgress = 
getJobItemProgress(checkJobId, 0);
+        ConsistencyCheckJobProgressInfo result = new 
ConsistencyCheckJobProgressInfo();
+        if (null == jobItemProgress) {
+            return result;
+        }
+        int inventoryFinishedPercentage;
+        LocalDateTime checkBeginTime = new 
Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
+        if (null != jobItemProgress.getRecordsCount() && 
Objects.equals(jobItemProgress.getCheckedRecordsCount(), 
jobItemProgress.getRecordsCount())) {
+            inventoryFinishedPercentage = 100;
+            LocalDateTime checkEndTime = new 
Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
+            Duration duration = Duration.between(checkBeginTime, checkEndTime);
+            result.setCheckDuration(duration.toMillis() / 1000);
+            result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
+            result.setRemainingTime(0L);
+        } else {
+            if (null == jobItemProgress.getRecordsCount()) {
+                inventoryFinishedPercentage = 0;
+            } else {
+                inventoryFinishedPercentage = 
BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() * 
100, jobItemProgress.getRecordsCount())).intValue();

Review Comment:
   `getCheckedRecordsCount` might be greater than `getRecordsCount`, the 
percentage could not greater than 100



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -43,17 +48,34 @@ public final class ConsistencyCheckJobItemContext 
implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
+    private String tableName;
+    
+    private volatile Long recordsCount;
+    
+    private final AtomicLong checkedRecordsCount = new AtomicLong(0);
+    
+    private final long checkBeginTimeMillis;
+    
+    private Long checkEndTimeMillis;
+    
     private final ConsistencyCheckJobConfiguration jobConfig;
     
     public ConsistencyCheckJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final 
JobStatus status) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
         this.status = status;
+        this.checkBeginTimeMillis = System.currentTimeMillis();

Review Comment:
   `this.` prefix is not necessary



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+
+/**
+ * Consistency check jon progress info.
+ */
+
+@Data
+public final class ConsistencyCheckJobProgressInfo {
+    
+    private String tableName;
+    
+    private Boolean checkResult;
+    
+    private int inventoryFinishedPercentage;
+    
+    private Long remainingTime;
+    
+    private String checkBeginTime;
+    
+    private String checkEndTime;
+    
+    private Long checkDuration;

Review Comment:
   These fields name could be updated, keep consistency as DistSQL response 
column names.
   And also related yaml class.
   And also `ConsistencyCheckJobProgress`.



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java:
##########
@@ -99,8 +100,10 @@ protected void runBlocking() {
             DataConsistencyCalculateAlgorithm calculateAlgorithm = 
jobAPI.buildDataConsistencyCalculateAlgorithm(
                     parentJobConfig, checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
             this.calculateAlgorithm = calculateAlgorithm;
-            Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult 
= jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+            
PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, 
jobItemContext.getShardingItem());

Review Comment:
   `addJobProgressPersistContext` exists in `ConsistencyCheckJob`, looks it's 
duplicated



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java:
##########
@@ -125,6 +128,7 @@ public void onSuccess() {
             jobItemContext.setStatus(JobStatus.FINISHED);
             checkJobAPI.persistJobItemProgress(jobItemContext);
             checkJobAPI.stop(checkJobId);
+            
         }

Review Comment:
   Blank line in method should be removed



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java:
##########
@@ -56,9 +57,11 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      *
      * @param pipelineJobConfig job configuration
      * @param calculateAlgorithm calculate algorithm
+     * @param checkJobProgressListener consistency check job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, 
DataConsistencyCalculateAlgorithm calculateAlgorithm);
+    Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, 
DataConsistencyCalculateAlgorithm calculateAlgorithm,
+                                                                 
PipelineJobProgressListener checkJobProgressListener);

Review Comment:
   `PipelineJobProgressListener jobProgressListener` in this interface is 
confusing, since there's already PipelineJobProgressListener for inventory job.
   Could we just use `ConsistencyCheckJobItemContext`?



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java:
##########
@@ -57,11 +58,15 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final InventoryIncrementalProcessContext processContext) {
+    private final ConsistencyCheckJobItemContext 
consistencyCheckJobItemContext;

Review Comment:
   `consistencyCheckJobItemContext` could be shorter, e.g. `checkJobItemContext`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -43,17 +48,34 @@ public final class ConsistencyCheckJobItemContext 
implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
+    private String tableName;

Review Comment:
   It's better to use `Collection<String> tableNames` to replace `String 
tableName`, since there might be several tables



##########
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java:
##########
@@ -167,16 +168,16 @@ protected void assertCheckMigrationSuccess(final String 
jobId, final String algo
         List<Map<String, Object>> checkJobResults = Collections.emptyList();
         for (int i = 0; i < 10; i++) {
             checkJobResults = queryForListWithLog(String.format("SHOW 
MIGRATION CHECK STATUS '%s'", jobId));
-            if (null != checkJobResults && !checkJobResults.isEmpty()) {
+            List<String> checkEndTimeList = checkJobResults.stream().map(map 
-> 
map.get("check_end_time").toString()).filter(StringUtils::isNotBlank).collect(Collectors.toList());
+            if (checkEndTimeList.size() == checkJobResults.size()) {
                 break;
             }
             ThreadUtil.sleep(5, TimeUnit.SECONDS);
         }
-        assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
         log.info("check job results: {}", checkJobResults);
         for (Map<String, Object> entry : checkJobResults) {
-            
assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
-            
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+            
assertTrue(Boolean.parseBoolean(entry.get("check_result").toString()));
+            assertThat(entry.get("inventory_finished_percentage").toString(), 
is("100"));

Review Comment:
   These DistSQL response cloumn name could be updated, keep consistency



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -156,6 +165,45 @@ public void stopByParentJobId(final String parentJobId) {
         stop(checkLatestJobId.get());
     }
     
+    @Override
+    public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String 
parentJobId) {
+        Optional<String> checkLatestJobId = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), 
() -> new PipelineJobNotFoundException(parentJobId));
+        String checkJobId = checkLatestJobId.get();
+        ConsistencyCheckJobProgress jobItemProgress = 
getJobItemProgress(checkJobId, 0);
+        ConsistencyCheckJobProgressInfo result = new 
ConsistencyCheckJobProgressInfo();
+        if (null == jobItemProgress) {
+            return result;
+        }
+        int inventoryFinishedPercentage;

Review Comment:
   Variable name could be improved, keep consistency with DistSQL response 
column name.
   e.g. inventoryFinishedPercentage. And also others.



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java:
##########
@@ -27,4 +27,14 @@
 public final class YamlConsistencyCheckJobProgress implements 
YamlConfiguration {
     
     private String status;
+    
+    private String tableName;
+    

Review Comment:
   It's better to use `tableNames`, since there might be several tables



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+
+/**
+ * Consistency check jon progress info.
+ */
+
+@Data
+public final class ConsistencyCheckJobProgressInfo {
+    
+    private String tableName;
+    

Review Comment:
   It's better to use `tableNames`, since there might be several tables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to