This is an automated email from the ASF dual-hosted git repository. menghaoran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new cdcfd5b Add getUncompletedJobIds method for ScalingAPI (#13306) cdcfd5b is described below commit cdcfd5bdb30993efaf024128b251dae05cca5fb1 Author: Hongsheng Zhong <sand...@126.com> AuthorDate: Wed Oct 27 13:41:29 2021 +0800 Add getUncompletedJobIds method for ScalingAPI (#13306) * Add getUncompletedJobIds method for ScalingAPI * Unit test --- .../scaling/core/api/ScalingAPI.java | 8 +++++ .../scaling/core/api/impl/ScalingAPIImpl.java | 42 +++++++++++++++++++--- .../scaling/core/api/impl/ScalingAPIImplTest.java | 3 ++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java index 95bb14d..746ac6b 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java @@ -40,6 +40,14 @@ public interface ScalingAPI { List<JobInfo> list(); /** + * Get uncompleted job ids of schema. + * + * @param schemaName schema name + * @return uncompleted job ids + */ + List<Long> getUncompletedJobIds(String schemaName); + + /** * Start scaling job by id. * * @param jobId job id diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java index a1d15db..aa1b619 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.api.impl; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo; import org.apache.shardingsphere.infra.config.TypedSPIConfiguration; import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration; import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory; @@ -33,8 +34,10 @@ import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlg import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant; import org.apache.shardingsphere.scaling.core.common.exception.DataCheckFailException; import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException; +import org.apache.shardingsphere.scaling.core.config.HandleConfiguration; import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.config.ScalingContext; +import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration; import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap; import org.apache.shardingsphere.scaling.core.job.JobContext; import org.apache.shardingsphere.scaling.core.job.JobStatus; @@ -61,6 +64,7 @@ import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; @Slf4j public final class ScalingAPIImpl implements ScalingAPI { @@ -69,10 +73,11 @@ public final class ScalingAPIImpl implements ScalingAPI { @Override public List<JobInfo> list() { - return ScalingAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream() - .filter(each -> !each.getJobName().startsWith("_")) - .map(each -> getJobInfo(each.getJobName())) - .collect(Collectors.toList()); + return getJobBriefInfos().map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList()); + } + + private Stream<JobBriefInfo> getJobBriefInfos() { + return ScalingAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_")); } private JobInfo getJobInfo(final String jobName) { @@ -89,6 +94,35 @@ public final class ScalingAPIImpl implements ScalingAPI { } @Override + public List<Long> getUncompletedJobIds(final String schemaName) { + return getJobBriefInfos().filter(each -> { + long jobId = Long.parseLong(each.getJobName()); + return isUncompletedJobOfSchema(schemaName, jobId); + }).map(each -> Long.parseLong(each.getJobName())).collect(Collectors.toList()); + } + + private boolean isUncompletedJobOfSchema(final String schemaName, final long jobId) { + JobConfigurationPOJO jobConfigPOJO; + try { + jobConfigPOJO = getElasticJobConfigPOJO(jobId); + } catch (final ScalingJobNotFoundException ex) { + log.warn("scaling job not found, jobId={}", jobId); + return false; + } + JobConfiguration jobConfig = getJobConfig(jobConfigPOJO); + HandleConfiguration handleConfig = jobConfig.getHandleConfig(); + WorkflowConfiguration workflowConfig; + if (null == handleConfig || null == (workflowConfig = handleConfig.getWorkflowConfig())) { + log.warn("handleConfig or workflowConfig null, jobId={}", jobId); + return false; + } + if (!schemaName.equals(workflowConfig.getSchemaName())) { + return false; + } + return !jobConfigPOJO.isDisabled(); + } + + @Override public void start(final long jobId) { log.info("Start scaling job {}", jobId); JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId); diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java index 3119418..23981c9 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java @@ -44,6 +44,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -73,6 +74,8 @@ public final class ScalingAPIImplTest { assertTrue(jobInfo.isActive()); assertThat(jobInfo.getTables(), is("t_order")); assertThat(jobInfo.getShardingTotalCount(), is(1)); + List<Long> uncompletedJobIds = scalingAPI.getUncompletedJobIds("logic_db"); + assertThat(uncompletedJobIds.size(), is(0)); } private Optional<JobInfo> getJobInfo(final long jobId) {