sandynz commented on code in PR #19875:
URL: https://github.com/apache/shardingsphere/pull/19875#discussion_r938522515
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java:
##########
@@ -101,9 +88,6 @@ public int getInventoryFinishedPercentage() {
* @return latest active time, <code>0</code> is there is no activity
*/
public long getIncrementalLatestActiveTimeMillis() {
- List<Long> delays = incrementalTaskProgressMap.values().stream()
- .map(each ->
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
- .collect(Collectors.toList());
- return delays.stream().reduce(Long::max).orElse(0L);
+ return incremental == null ? 0L :
incremental.getIncrementalLatestActiveTimeMillis();
Review Comment:
`var == null` could be replaced to `null == var`, there're 4 places could be
improved.
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java:
##########
@@ -17,24 +17,49 @@
package org.apache.shardingsphere.data.pipeline.api.task.progress;
-import lombok.AllArgsConstructor;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
/**
* Incremental task progress.
*/
-@NoArgsConstructor
-@AllArgsConstructor
+@RequiredArgsConstructor
@Getter
@Setter
@ToString
public final class IncrementalTaskProgress implements TaskProgress {
- private volatile IngestPosition<?> position;
+ private final Map<String, IncrementalTaskProgressItem>
incrementalTaskProgressItemMap;
- private IncrementalTaskDelay incrementalTaskDelay = new
IncrementalTaskDelay();
Review Comment:
Could we keep the similar structure as YamlIncrementalTaskProgress?
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java:
##########
@@ -62,26 +60,25 @@ public void persistJobProgress(final RuleAlteredJobContext
jobContext) {
JobProgress jobProgress = new JobProgress();
jobProgress.setStatus(jobContext.getStatus());
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
-
jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
-
jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
+ jobProgress.setIncremental(getIncrementalTaskProgress(jobContext));
+ jobProgress.setInventory(getInventoryTaskProgress(jobContext));
String value = YamlEngine.marshal(SWAPPER.swapToYaml(jobProgress));
repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(),
jobContext.getShardingItem()), value);
}
- private Map<String, IncrementalTaskProgress>
getIncrementalTaskProgressMap(final RuleAlteredJobContext jobContext) {
- Map<String, IncrementalTaskProgress> result = new
HashMap<>(jobContext.getIncrementalTasks().size(), 1);
- for (IncrementalTask each : jobContext.getIncrementalTasks()) {
- result.put(each.getTaskId(), each.getProgress());
- }
- return result;
+ private IncrementalTaskProgress getIncrementalTaskProgress(final
RuleAlteredJobContext jobContext) {
+ return new IncrementalTaskProgress(
+ jobContext.getIncrementalTasks()
+ .stream().collect(Collectors.toMap(IncrementalTask::getTaskId,
each ->
each.getProgress().getIncrementalTaskProgressItemMap().get(each.getTaskId())))
+ );
}
- private Map<String, InventoryTaskProgress>
getInventoryTaskProgressMap(final RuleAlteredJobContext jobContext) {
- Map<String, InventoryTaskProgress> result = new
HashMap<>(jobContext.getInventoryTasks().size(), 1);
- for (InventoryTask each : jobContext.getInventoryTasks()) {
- result.put(each.getTaskId(), each.getProgress());
- }
- return result;
+ private InventoryTaskProgress getInventoryTaskProgress(final
RuleAlteredJobContext jobContext) {
+ return new InventoryTaskProgress(
+ jobContext.getInventoryTasks()
+ .stream()
+ .collect(Collectors.toMap(InventoryTask::getTaskId,
each ->
each.getProgress().getInventoryTaskProgressItemMap().get(each.getTaskId())))
+ );
}
Review Comment:
It's better use for each, since collection stream might hurt performance on
JDK8.
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java:
##########
@@ -46,19 +41,17 @@ public final class JobProgress implements
PipelineJobProgress {
private boolean active;
- private Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
+ private InventoryTaskProgress inventory;
- private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
+ private IncrementalTaskProgress incremental;
Review Comment:
After reading through the changes, I changed some ideas. In order to keep it
more understandable, we'd better not change the meaning of TaskProgress
implementations and not add XxxTaskProgressItem.
Possible changes:
1, Could we keep InventoryTaskProgress and IncrementalTaskProgress
unchanged? Since InventoryTask.getProgress() return InventoryTaskProgress and
IncrementalTask.getProgress() return IncrementalTaskProgress.
2, We just add new classes, e.g.
- InventoryTaskProgresses to encapsulate `Map<String, InventoryTaskProgress>
inventoryTaskProgressMap`
- IncrementalTaskProgresses to encapsulate `Map<String,
IncrementalTaskProgress> incrementalTaskProgressMap`
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java:
##########
@@ -46,19 +41,17 @@ public final class JobProgress implements
PipelineJobProgress {
private boolean active;
- private Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
+ private InventoryTaskProgress inventory;
- private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
+ private IncrementalTaskProgress incremental;
/**
- * Get incremental position.
- *
- * @param dataSourceName data source name
+ * get incremental position.
+ * @param dataSourceName dataSource
* @return incremental position
*/
Review Comment:
Javadoc first character should be upper case. And it's better to leave a
blank line better parameters.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]