sandynz commented on code in PR #21019:
URL: https://github.com/apache/shardingsphere/pull/21019#discussion_r972859949
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java:
##########
@@ -44,4 +45,11 @@ public interface InventoryIncrementalJobItemContext extends
PipelineJobItemConte
* @return incremental tasks
*/
Collection<IncrementalTask> getIncrementalTasks();
+
+ /**
+ * Get processed record count.
+ *
+ * @return processed record count.
+ */
+ long getProcessedRecordCount();
Review Comment:
Could we change it to `getProcessedRecordsCount`?
##########
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java:
##########
@@ -60,14 +61,15 @@ public void init(final ShardingSphereDatabase database,
final SQLStatement sqlSt
result.add("");
result.add("");
result.add("");
+ result.add("");
}
return result;
}).collect(Collectors.toList()).iterator();
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active",
"inventory_finished_percentage", "incremental_idle_seconds");
+ return Arrays.asList("item", "data_source", "status", "active",
"processed_records", "inventory_finished_percentage",
"incremental_idle_seconds");
Review Comment:
Could we use `processed_records_count`? Keep consistency with variable
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProcessUpdateParameter.java:
##########
@@ -15,24 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
/**
- * Default pipeline job progress listener implementation.
+ * Pipeline job process update parameter.
*/
@RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements
PipelineJobProgressListener {
+@Getter
+public final class PipelineJobProcessUpdateParameter {
Review Comment:
Could we rename class name `PipelineJobProcessUpdateParameter` to
`PipelineJobProgressUpdatedParameter`? Similar to method name
`onProgressUpdated`
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java:
##########
@@ -37,4 +37,6 @@ public final class YamlInventoryIncrementalJobItemProgress
implements YamlConfig
private YamlJobItemInventoryTasksProgress inventory;
private YamlJobItemIncrementalTasksProgress incremental;
+
+ private long processedRecordCount;
Review Comment:
processedRecordsCount too, and also yaml class
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java:
##########
@@ -39,4 +39,6 @@ public final class InventoryIncrementalJobItemProgress
implements PipelineJobIte
private JobItemInventoryTasksProgress inventory;
private JobItemIncrementalTasksProgress incremental;
+
+ private long processedRecordCount;
Review Comment:
Could use `processedRecordsCount` to replace `processedRecordCount`?
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProcessUpdateParameter.java:
##########
@@ -15,24 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
/**
- * Default pipeline job progress listener implementation.
+ * Pipeline job process update parameter.
*/
@RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements
PipelineJobProgressListener {
+@Getter
+public final class PipelineJobProcessUpdateParameter {
- private final String jobId;
+ private final int insertRecordNumber;
- private final int shardingItem;
-
- @Override
- public void onProgressUpdated() {
- PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
- }
+ private final int deleteRecordNumber;
Review Comment:
1, Could we replace `RecordNumber` to `RecordsCount`?
2, insert and deleted could inserted and deleted
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java:
##########
@@ -92,6 +97,9 @@ public MigrationJobItemContext(final
MigrationJobConfiguration jobConfig, final
this.shardingItem = shardingItem;
this.dataSourceName = taskConfig.getDataSourceName();
this.initProgress = initProgress;
+ if (null != initProgress) {
+
processedRecordCount.addAndGet(initProgress.getProcessedRecordCount());
+ }
Review Comment:
It's better to use `set` instead of `addAndGet`
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java:
##########
@@ -125,4 +133,16 @@ public PipelineTableMetaDataLoader
getSourceMetaDataLoader() {
public boolean isSourceTargetDatabaseTheSame() {
return
jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
}
+
+ @Override
+ public void onProgressUpdated(final PipelineJobProcessUpdateParameter
parameter) {
+ int needAddNumber = parameter.getInsertRecordNumber() -
parameter.getDeleteRecordNumber();
+ processedRecordCount.addAndGet(needAddNumber);
+ PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+ }
+
+ @Override
+ public long getProcessedRecordCount() {
+ return processedRecordCount.get();
+ }
Review Comment:
processedRecordsCount too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]