This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3acec5ec7d5 Add PipelineJobItemContext.toProgress() (#29071)
3acec5ec7d5 is described below

commit 3acec5ec7d54da88071a5b6171e2eb8f310ac4f1
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 12:59:15 2023 +0800

    Add PipelineJobItemContext.toProgress() (#29071)
---
 .../InventoryIncrementalJobItemContext.java        |  7 +++-
 .../common/context/PipelineJobItemContext.java     |  8 +++++
 .../progress/ConsistencyCheckJobItemProgress.java  | 19 ++++++++--
 .../InventoryIncrementalJobItemProgress.java       | 42 +++++++++++++++++++---
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 32 ++---------------
 .../api/impl/ConsistencyCheckJobAPI.java           | 12 +------
 .../context/ConsistencyCheckJobItemContext.java    |  7 ++++
 7 files changed, 77 insertions(+), 50 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
index 8a2ca947905..ae0cd2f48c2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.common.context;
 
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 
@@ -88,4 +88,9 @@ public interface InventoryIncrementalJobItemContext extends 
PipelineJobItemConte
      * @return inventory records count
      */
     long getInventoryRecordsCount();
+    
+    @Override
+    default InventoryIncrementalJobItemProgress toProgress() {
+        return new InventoryIncrementalJobItemProgress(this);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
index ded73005aed..154aa659438 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.context;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 
 /**
  * Pipeline job item context.
@@ -85,4 +86,11 @@ public interface PipelineJobItemContext {
      * @return stopping
      */
     boolean isStopping();
+    
+    /**
+     * Convert to pipeline job item progress.
+     * 
+     * @return converted pipeline job item progress
+     */
+    PipelineJobItemProgress toProgress();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
index f43db99d70f..0a1ac665b29 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 
 import java.util.Map;
 
@@ -34,9 +35,6 @@ import java.util.Map;
 // TODO Refactor structure, List<TableProgress>
 public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemProgress {
     
-    @Setter
-    private JobStatus status = JobStatus.RUNNING;
-    
     private final String tableNames;
     
     private final String ignoredTableNames;
@@ -54,4 +52,19 @@ public final class ConsistencyCheckJobItemProgress 
implements PipelineJobItemPro
     private final Map<String, Object> targetTableCheckPositions;
     
     private final String sourceDatabaseType;
+    
+    @Setter
+    private JobStatus status = JobStatus.RUNNING;
+    
+    public ConsistencyCheckJobItemProgress(final 
ConsistencyCheckJobItemProgressContext context) {
+        tableNames = String.join(",", context.getTableNames());
+        ignoredTableNames = String.join(",", context.getIgnoredTableNames());
+        checkedRecordsCount = context.getCheckedRecordsCount().get();
+        recordsCount = context.getRecordsCount();
+        checkBeginTimeMillis = context.getCheckBeginTimeMillis();
+        checkEndTimeMillis = context.getCheckEndTimeMillis();
+        sourceTableCheckPositions = context.getSourceTableCheckPositions();
+        targetTableCheckPositions = context.getTargetTableCheckPositions();
+        sourceDatabaseType = context.getSourceDatabaseType();
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
index 043b00b9f43..d0ffdb41df4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
@@ -18,30 +18,62 @@
 package org.apache.shardingsphere.data.pipeline.common.job.progress;
 
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Inventory incremental job item progress.
  */
+@NoArgsConstructor
 @Getter
 @Setter
 public final class InventoryIncrementalJobItemProgress implements 
PipelineJobItemProgress {
     
-    private JobStatus status = JobStatus.RUNNING;
-    
     private DatabaseType sourceDatabaseType;
     
     private String dataSourceName;
     
-    private boolean active;
-    
     private JobItemInventoryTasksProgress inventory;
     
     private JobItemIncrementalTasksProgress incremental;
     
+    private long inventoryRecordsCount;
+    
     private long processedRecordsCount;
     
-    private long inventoryRecordsCount;
+    private boolean active;
+    
+    private JobStatus status = JobStatus.RUNNING;
+    
+    public InventoryIncrementalJobItemProgress(final 
InventoryIncrementalJobItemContext context) {
+        sourceDatabaseType = context.getJobConfig().getSourceDatabaseType();
+        dataSourceName = context.getDataSourceName();
+        inventory = getInventoryTasksProgress(context.getInventoryTasks());
+        incremental = 
getIncrementalTasksProgress(context.getIncrementalTasks());
+        inventoryRecordsCount = context.getInventoryRecordsCount();
+        processedRecordsCount = context.getProcessedRecordsCount();
+        status = context.getStatus();
+    }
+    
+    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<PipelineTask> incrementalTasks) {
+        return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() 
? null : (IncrementalTaskProgress) 
incrementalTasks.iterator().next().getTaskProgress());
+    }
+    
+    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
Collection<PipelineTask> inventoryTasks) {
+        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
HashMap<>();
+        for (PipelineTask each : inventoryTasks) {
+            inventoryTaskProgressMap.put(each.getTaskId(), 
(InventoryTaskProgress) each.getTaskProgress());
+        }
+        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 7462f12669e..372971129cc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -23,13 +23,10 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemInventoryTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
@@ -38,8 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobO
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -47,7 +42,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
@@ -55,7 +49,6 @@ import 
org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -134,35 +127,14 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
                 .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
     }
     
-    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
-        InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
-        jobItemProgress.setStatus(context.getStatus());
-        
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
-        jobItemProgress.setDataSourceName(context.getDataSourceName());
-        
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
-        
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
-        
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
-        
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
-        return 
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
-    }
-    
     @Override
     public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
                 .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
     }
     
-    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<PipelineTask> incrementalTasks) {
-        return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() 
? null : (IncrementalTaskProgress) 
incrementalTasks.iterator().next().getTaskProgress());
-    }
-    
-    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
Collection<PipelineTask> inventoryTasks) {
-        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
HashMap<>();
-        for (PipelineTask each : inventoryTasks) {
-            inventoryTaskProgressMap.put(each.getTaskId(), 
(InventoryTaskProgress) each.getTaskProgress());
-        }
-        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        return 
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress)
 jobItemContext.toProgress()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index e96f1b2e4f3..da0f06e6ed9 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlCons
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -44,7 +43,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.poj
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -129,15 +127,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        ConsistencyCheckJobItemContext context = 
(ConsistencyCheckJobItemContext) jobItemContext;
-        ConsistencyCheckJobItemProgressContext progressContext = 
context.getProgressContext();
-        String tableNames = String.join(",", progressContext.getTableNames());
-        String ignoredTableNames = String.join(",", 
progressContext.getIgnoredTableNames());
-        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames, 
progressContext.getCheckedRecordsCount().get(),
-                progressContext.getRecordsCount(), 
progressContext.getCheckBeginTimeMillis(), 
progressContext.getCheckEndTimeMillis(),
-                progressContext.getSourceTableCheckPositions(), 
progressContext.getTargetTableCheckPositions(), 
progressContext.getSourceDatabaseType());
-        jobItemProgress.setStatus(context.getStatus());
-        return 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
+        return 
YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress)
 jobItemContext.toProgress()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 5dd7865398b..30bed68443f 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -69,4 +69,11 @@ public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemCont
     public PipelineProcessContext getJobProcessContext() {
         return processContext;
     }
+    
+    @Override
+    public ConsistencyCheckJobItemProgress toProgress() {
+        ConsistencyCheckJobItemProgress result = new 
ConsistencyCheckJobItemProgress(progressContext);
+        result.setStatus(status);
+        return result;
+    }
 }

Reply via email to