sandynz commented on code in PR #19875:
URL: https://github.com/apache/shardingsphere/pull/19875#discussion_r938522515


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java:
##########
@@ -101,9 +88,6 @@ public int getInventoryFinishedPercentage() {
      * @return latest active time, <code>0</code> is there is no activity
      */
     public long getIncrementalLatestActiveTimeMillis() {
-        List<Long> delays = incrementalTaskProgressMap.values().stream()
-                .map(each -> 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
-                .collect(Collectors.toList());
-        return delays.stream().reduce(Long::max).orElse(0L);
+        return incremental == null ? 0L : 
incremental.getIncrementalLatestActiveTimeMillis();

Review Comment:
   `var == null` could be replaced to `null == var`, there're 4 places could be 
improved.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java:
##########
@@ -17,24 +17,49 @@
 
 package org.apache.shardingsphere.data.pipeline.api.task.progress;
 
-import lombok.AllArgsConstructor;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 
 /**
  * Incremental task progress.
  */
-@NoArgsConstructor
-@AllArgsConstructor
+@RequiredArgsConstructor
 @Getter
 @Setter
 @ToString
 public final class IncrementalTaskProgress implements TaskProgress {
     
-    private volatile IngestPosition<?> position;
+    private final Map<String, IncrementalTaskProgressItem> 
incrementalTaskProgressItemMap;
     
-    private IncrementalTaskDelay incrementalTaskDelay = new 
IncrementalTaskDelay();

Review Comment:
   Could we keep the similar structure as YamlIncrementalTaskProgress?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java:
##########
@@ -62,26 +60,25 @@ public void persistJobProgress(final RuleAlteredJobContext 
jobContext) {
         JobProgress jobProgress = new JobProgress();
         jobProgress.setStatus(jobContext.getStatus());
         
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
-        
jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
-        
jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
+        jobProgress.setIncremental(getIncrementalTaskProgress(jobContext));
+        jobProgress.setInventory(getInventoryTaskProgress(jobContext));
         String value = YamlEngine.marshal(SWAPPER.swapToYaml(jobProgress));
         
repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(),
 jobContext.getShardingItem()), value);
     }
     
-    private Map<String, IncrementalTaskProgress> 
getIncrementalTaskProgressMap(final RuleAlteredJobContext jobContext) {
-        Map<String, IncrementalTaskProgress> result = new 
HashMap<>(jobContext.getIncrementalTasks().size(), 1);
-        for (IncrementalTask each : jobContext.getIncrementalTasks()) {
-            result.put(each.getTaskId(), each.getProgress());
-        }
-        return result;
+    private IncrementalTaskProgress getIncrementalTaskProgress(final 
RuleAlteredJobContext jobContext) {
+        return new IncrementalTaskProgress(
+                jobContext.getIncrementalTasks()
+                .stream().collect(Collectors.toMap(IncrementalTask::getTaskId, 
each -> 
each.getProgress().getIncrementalTaskProgressItemMap().get(each.getTaskId())))
+        );
     }
     
-    private Map<String, InventoryTaskProgress> 
getInventoryTaskProgressMap(final RuleAlteredJobContext jobContext) {
-        Map<String, InventoryTaskProgress> result = new 
HashMap<>(jobContext.getInventoryTasks().size(), 1);
-        for (InventoryTask each : jobContext.getInventoryTasks()) {
-            result.put(each.getTaskId(), each.getProgress());
-        }
-        return result;
+    private InventoryTaskProgress getInventoryTaskProgress(final 
RuleAlteredJobContext jobContext) {
+        return new InventoryTaskProgress(
+                jobContext.getInventoryTasks()
+                        .stream()
+                        .collect(Collectors.toMap(InventoryTask::getTaskId, 
each -> 
each.getProgress().getInventoryTaskProgressItemMap().get(each.getTaskId())))
+        );
     }

Review Comment:
   It's better use for each, since collection stream might hurt performance on 
JDK8.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java:
##########
@@ -46,19 +41,17 @@ public final class JobProgress implements 
PipelineJobProgress {
     
     private boolean active;
     
-    private Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
+    private InventoryTaskProgress inventory;
     
-    private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
+    private IncrementalTaskProgress incremental;

Review Comment:
   After reading through the changes, I changed some ideas. In order to keep it 
more understandable, we'd better not change the meaning of TaskProgress 
implementations and not add XxxTaskProgressItem.
   
   Possible changes:
   
   1, Could we keep InventoryTaskProgress and IncrementalTaskProgress 
unchanged? Since InventoryTask.getProgress() return InventoryTaskProgress and 
IncrementalTask.getProgress() return IncrementalTaskProgress.
   
   2, We just add new classes, e.g.
   - InventoryTaskProgresses to encapsulate `Map<String, InventoryTaskProgress> 
inventoryTaskProgressMap`
   - IncrementalTaskProgresses to encapsulate `Map<String, 
IncrementalTaskProgress> incrementalTaskProgressMap`
   
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java:
##########
@@ -46,19 +41,17 @@ public final class JobProgress implements 
PipelineJobProgress {
     
     private boolean active;
     
-    private Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
+    private InventoryTaskProgress inventory;
     
-    private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
+    private IncrementalTaskProgress incremental;
     
     /**
-     * Get incremental position.
-     *
-     * @param dataSourceName data source name
+     * get incremental position.
+     * @param dataSourceName dataSource
      * @return incremental position
      */

Review Comment:
   Javadoc first character should be upper case. And it's better to leave a 
blank line better parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to