sandynz commented on code in PR #21019:
URL: https://github.com/apache/shardingsphere/pull/21019#discussion_r972859949


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java:
##########
@@ -44,4 +45,11 @@ public interface InventoryIncrementalJobItemContext extends 
PipelineJobItemConte
      * @return incremental tasks
      */
     Collection<IncrementalTask> getIncrementalTasks();
+    
+    /**
+     * Get processed record count.
+     *
+     * @return processed record count.
+     */
+    long getProcessedRecordCount();

Review Comment:
   Could we change it to `getProcessedRecordsCount`?



##########
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java:
##########
@@ -60,14 +61,15 @@ public void init(final ShardingSphereDatabase database, 
final SQLStatement sqlSt
                         result.add("");
                         result.add("");
                         result.add("");
+                        result.add("");
                     }
                     return result;
                 }).collect(Collectors.toList()).iterator();
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", 
"inventory_finished_percentage", "incremental_idle_seconds");
+        return Arrays.asList("item", "data_source", "status", "active", 
"processed_records", "inventory_finished_percentage", 
"incremental_idle_seconds");

Review Comment:
   Could we use `processed_records_count`? Keep consistency with variable



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProcessUpdateParameter.java:
##########
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 /**
- * Default pipeline job progress listener implementation.
+ * Pipeline job process update parameter.
  */
 @RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements 
PipelineJobProgressListener {
+@Getter
+public final class PipelineJobProcessUpdateParameter {

Review Comment:
   Could we rename class name `PipelineJobProcessUpdateParameter` to 
`PipelineJobProgressUpdatedParameter`? Similar to method name 
`onProgressUpdated`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java:
##########
@@ -37,4 +37,6 @@ public final class YamlInventoryIncrementalJobItemProgress 
implements YamlConfig
     private YamlJobItemInventoryTasksProgress inventory;
     
     private YamlJobItemIncrementalTasksProgress incremental;
+    
+    private long processedRecordCount;

Review Comment:
   processedRecordsCount too, and also yaml class



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java:
##########
@@ -39,4 +39,6 @@ public final class InventoryIncrementalJobItemProgress 
implements PipelineJobIte
     private JobItemInventoryTasksProgress inventory;
     
     private JobItemIncrementalTasksProgress incremental;
+    
+    private long processedRecordCount;

Review Comment:
   Could use `processedRecordsCount` to replace `processedRecordCount`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProcessUpdateParameter.java:
##########
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 /**
- * Default pipeline job progress listener implementation.
+ * Pipeline job process update parameter.
  */
 @RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements 
PipelineJobProgressListener {
+@Getter
+public final class PipelineJobProcessUpdateParameter {
     
-    private final String jobId;
+    private final int insertRecordNumber;
     
-    private final int shardingItem;
-    
-    @Override
-    public void onProgressUpdated() {
-        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
-    }
+    private final int deleteRecordNumber;

Review Comment:
   1, Could we replace `RecordNumber` to `RecordsCount`?
   
   2, insert and deleted could inserted and deleted
   
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java:
##########
@@ -92,6 +97,9 @@ public MigrationJobItemContext(final 
MigrationJobConfiguration jobConfig, final
         this.shardingItem = shardingItem;
         this.dataSourceName = taskConfig.getDataSourceName();
         this.initProgress = initProgress;
+        if (null != initProgress) {
+            
processedRecordCount.addAndGet(initProgress.getProcessedRecordCount());
+        }

Review Comment:
   It's better to use `set` instead of `addAndGet`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java:
##########
@@ -125,4 +133,16 @@ public PipelineTableMetaDataLoader 
getSourceMetaDataLoader() {
     public boolean isSourceTargetDatabaseTheSame() {
         return 
jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
     }
+    
+    @Override
+    public void onProgressUpdated(final PipelineJobProcessUpdateParameter 
parameter) {
+        int needAddNumber = parameter.getInsertRecordNumber() - 
parameter.getDeleteRecordNumber();
+        processedRecordCount.addAndGet(needAddNumber);
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+    }
+    
+    @Override
+    public long getProcessedRecordCount() {
+        return processedRecordCount.get();
+    }

Review Comment:
   processedRecordsCount too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to