This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f6338ac9327 Improve list() impl for pipeline job api (#20540)
f6338ac9327 is described below

commit f6338ac93278ff3c8538cecc09e910cdbad86340
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 26 13:20:00 2022 +0800

    Improve list() impl for pipeline job api (#20540)
    
    * Improve list() impl
    
    * Filter by job type for list()
---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 14 ++++++++++--
 .../scenario/migration/MigrationJobAPIImpl.java    | 26 ++++++++++------------
 2 files changed, 24 insertions(+), 16 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 4ba03f71e41..941d2027fb2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -130,10 +130,18 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
     }
     
     private Stream<JobBriefInfo> getJobBriefInfos() {
-        return 
PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"));
+        return 
PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
+                .filter(each -> 
PipelineJobIdUtils.parseJobType(each.getJobName()) == getJobType());
     }
     
-    protected abstract PipelineJobInfo getJobInfo(String jobName);
+    protected abstract PipelineJobInfo getJobInfo(String jobId);
+    
+    protected void fillJobInfo(final PipelineJobInfo jobInfo, final 
JobConfigurationPOJO jobConfigPOJO) {
+        jobInfo.setActive(!jobConfigPOJO.isDisabled());
+        jobInfo.setShardingTotalCount(jobConfigPOJO.getShardingTotalCount());
+        
jobInfo.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
+        jobInfo.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+    }
     
     @Override
     public Optional<String> start(final PipelineJobConfiguration jobConfig) {
@@ -165,6 +173,8 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected abstract YamlPipelineJobConfiguration 
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
     
+    protected abstract PipelineJobConfiguration 
getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
+    
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 9102c356ebe..6b4d8413a0b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -127,6 +127,16 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         return (List<MigrationJobInfo>) super.list();
     }
     
+    @Override
+    protected MigrationJobInfo getJobInfo(final String jobId) {
+        MigrationJobInfo result = new MigrationJobInfo(jobId);
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        fillJobInfo(result, jobConfigPOJO);
+        MigrationJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
+        result.setTable(jobConfig.getSourceTableName());
+        return result;
+    }
+    
     @Override
     public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
         YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) 
yamlJobConfig;
@@ -167,7 +177,8 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         return getJobConfiguration(getElasticJobConfigPOJO(jobId));
     }
     
-    private MigrationJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+    @Override
+    protected MigrationJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
         return 
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
     
@@ -220,19 +231,6 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         return new MigrationProcessContext(pipelineJobConfig.getJobId(), 
processConfig);
     }
     
-    @Override
-    protected MigrationJobInfo getJobInfo(final String jobName) {
-        MigrationJobInfo result = new MigrationJobInfo(jobName);
-        JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(result.getJobId());
-        MigrationJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
-        result.setActive(!jobConfigPOJO.isDisabled());
-        result.setShardingTotalCount(jobConfig.getJobShardingCount());
-        result.setTable(jobConfig.getSourceTableName());
-        
result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
-        result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
-        return result;
-    }
-    
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final String jobId) {
         checkModeConfig();

Reply via email to