This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f6338ac9327 Improve list() impl for pipeline job api (#20540)
f6338ac9327 is described below
commit f6338ac93278ff3c8538cecc09e910cdbad86340
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 26 13:20:00 2022 +0800
Improve list() impl for pipeline job api (#20540)
* Improve list() impl
* Filter by job type for list()
---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 14 ++++++++++--
.../scenario/migration/MigrationJobAPIImpl.java | 26 ++++++++++------------
2 files changed, 24 insertions(+), 16 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 4ba03f71e41..941d2027fb2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -130,10 +130,18 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
}
private Stream<JobBriefInfo> getJobBriefInfos() {
- return
PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"));
+ return
PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
+ .filter(each ->
PipelineJobIdUtils.parseJobType(each.getJobName()) == getJobType());
}
- protected abstract PipelineJobInfo getJobInfo(String jobName);
+ protected abstract PipelineJobInfo getJobInfo(String jobId);
+
+ protected void fillJobInfo(final PipelineJobInfo jobInfo, final
JobConfigurationPOJO jobConfigPOJO) {
+ jobInfo.setActive(!jobConfigPOJO.isDisabled());
+ jobInfo.setShardingTotalCount(jobConfigPOJO.getShardingTotalCount());
+
jobInfo.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
+ jobInfo.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+ }
@Override
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
@@ -165,6 +173,8 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected abstract YamlPipelineJobConfiguration
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
+ protected abstract PipelineJobConfiguration
getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
+
@Override
public void startDisabledJob(final String jobId) {
log.info("Start disabled pipeline job {}", jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 9102c356ebe..6b4d8413a0b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -127,6 +127,16 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
return (List<MigrationJobInfo>) super.list();
}
+ @Override
+ protected MigrationJobInfo getJobInfo(final String jobId) {
+ MigrationJobInfo result = new MigrationJobInfo(jobId);
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ fillJobInfo(result, jobConfigPOJO);
+ MigrationJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
+ result.setTable(jobConfig.getSourceTableName());
+ return result;
+ }
+
@Override
public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration)
yamlJobConfig;
@@ -167,7 +177,8 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
}
- private MigrationJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
+ @Override
+ protected MigrationJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
return
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
@@ -220,19 +231,6 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
return new MigrationProcessContext(pipelineJobConfig.getJobId(),
processConfig);
}
- @Override
- protected MigrationJobInfo getJobInfo(final String jobName) {
- MigrationJobInfo result = new MigrationJobInfo(jobName);
- JobConfigurationPOJO jobConfigPOJO =
getElasticJobConfigPOJO(result.getJobId());
- MigrationJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
- result.setActive(!jobConfigPOJO.isDisabled());
- result.setShardingTotalCount(jobConfig.getJobShardingCount());
- result.setTable(jobConfig.getSourceTableName());
-
result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
- result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
- return result;
- }
-
@Override
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final String jobId) {
checkModeConfig();