This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cdcfd5b  Add getUncompletedJobIds method for ScalingAPI (#13306)
cdcfd5b is described below

commit cdcfd5bdb30993efaf024128b251dae05cca5fb1
Author: Hongsheng Zhong <sand...@126.com>
AuthorDate: Wed Oct 27 13:41:29 2021 +0800

    Add getUncompletedJobIds method for ScalingAPI (#13306)
    
    * Add getUncompletedJobIds method for ScalingAPI
    
    * Unit test
---
 .../scaling/core/api/ScalingAPI.java               |  8 +++++
 .../scaling/core/api/impl/ScalingAPIImpl.java      | 42 +++++++++++++++++++---
 .../scaling/core/api/impl/ScalingAPIImplTest.java  |  3 ++
 3 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
index 95bb14d..746ac6b 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
@@ -40,6 +40,14 @@ public interface ScalingAPI {
     List<JobInfo> list();
     
     /**
+     * Get uncompleted job ids of schema.
+     *
+     * @param schemaName schema name
+     * @return uncompleted job ids
+     */
+    List<Long> getUncompletedJobIds(String schemaName);
+    
+    /**
      * Start scaling job by id.
      *
      * @param jobId job id
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index a1d15db..aa1b619 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.TypedSPIConfiguration;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
@@ -33,8 +34,10 @@ import 
org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlg
 import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import 
org.apache.shardingsphere.scaling.core.common.exception.DataCheckFailException;
 import 
org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
 import 
org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 import org.apache.shardingsphere.scaling.core.job.JobStatus;
@@ -61,6 +64,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 @Slf4j
 public final class ScalingAPIImpl implements ScalingAPI {
@@ -69,10 +73,11 @@ public final class ScalingAPIImpl implements ScalingAPI {
     
     @Override
     public List<JobInfo> list() {
-        return 
ScalingAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream()
-                .filter(each -> !each.getJobName().startsWith("_"))
-                .map(each -> getJobInfo(each.getJobName()))
-                .collect(Collectors.toList());
+        return getJobBriefInfos().map(each -> 
getJobInfo(each.getJobName())).collect(Collectors.toList());
+    }
+    
+    private Stream<JobBriefInfo> getJobBriefInfos() {
+        return 
ScalingAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"));
     }
     
     private JobInfo getJobInfo(final String jobName) {
@@ -89,6 +94,35 @@ public final class ScalingAPIImpl implements ScalingAPI {
     }
     
     @Override
+    public List<Long> getUncompletedJobIds(final String schemaName) {
+        return getJobBriefInfos().filter(each -> {
+            long jobId = Long.parseLong(each.getJobName());
+            return isUncompletedJobOfSchema(schemaName, jobId);
+        }).map(each -> 
Long.parseLong(each.getJobName())).collect(Collectors.toList());
+    }
+    
+    private boolean isUncompletedJobOfSchema(final String schemaName, final 
long jobId) {
+        JobConfigurationPOJO jobConfigPOJO;
+        try {
+            jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        } catch (final ScalingJobNotFoundException ex) {
+            log.warn("scaling job not found, jobId={}", jobId);
+            return false;
+        }
+        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        HandleConfiguration handleConfig = jobConfig.getHandleConfig();
+        WorkflowConfiguration workflowConfig;
+        if (null == handleConfig || null == (workflowConfig = 
handleConfig.getWorkflowConfig())) {
+            log.warn("handleConfig or workflowConfig null, jobId={}", jobId);
+            return false;
+        }
+        if (!schemaName.equals(workflowConfig.getSchemaName())) {
+            return false;
+        }
+        return !jobConfigPOJO.isDisabled();
+    }
+    
+    @Override
     public void start(final long jobId) {
         log.info("Start scaling job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index 3119418..23981c9 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -44,6 +44,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -73,6 +74,8 @@ public final class ScalingAPIImplTest {
         assertTrue(jobInfo.isActive());
         assertThat(jobInfo.getTables(), is("t_order"));
         assertThat(jobInfo.getShardingTotalCount(), is(1));
+        List<Long> uncompletedJobIds = 
scalingAPI.getUncompletedJobIds("logic_db");
+        assertThat(uncompletedJobIds.size(), is(0));
     }
     
     private Optional<JobInfo> getJobInfo(final long jobId) {

Reply via email to