This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5f3946ece36 Refactor pipeline getJobItemProgress return Optional
(#23108)
5f3946ece36 is described below
commit 5f3946ece365bb5b034be19f1dc87677e1b0795e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 28 10:48:47 2022 +0800
Refactor pipeline getJobItemProgress return Optional (#23108)
* Refactor GovernanceRepositoryAPI.getJobItemProgress return Optional
* Refactor PipelineJobAPI.getJobItemProgress return Optional
* Compatible with empty offset value
---
.../data/pipeline/cdc/api/impl/CDCJobAPIImpl.java | 3 ++-
.../data/pipeline/cdc/core/job/CDCJob.java | 6 +++--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 2 +-
.../pipeline/core/api/GovernanceRepositoryAPI.java | 2 +-
.../core/api/InventoryIncrementalJobAPI.java | 3 ++-
.../data/pipeline/core/api/PipelineJobAPI.java | 2 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 23 ++++++++---------
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 5 ++--
.../consistencycheck/ConsistencyCheckJob.java | 8 ++++--
.../api/impl/ConsistencyCheckJobAPI.java | 29 +++++++++++-----------
.../pipeline/scenario/migration/MigrationJob.java | 5 ++--
.../migration/prepare/MigrationJobPreparer.java | 9 ++++---
.../handler/cdc/fixture/FixtureCDCJobAPI.java | 2 +-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 13 +++++-----
.../core/api/impl/MigrationJobAPITest.java | 7 +++---
15 files changed, 63 insertions(+), 56 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
index 7b93604f380..7d7c6b271be 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
@@ -84,6 +84,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -232,7 +233,7 @@ public final class CDCJobAPIImpl extends
AbstractInventoryIncrementalJobAPIImpl
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
+ public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
// TODO to be implemented
return null;
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 8dbe1a30974..1b7e68c95aa 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -38,6 +38,8 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTas
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import java.util.Optional;
+
/**
* CDC job.
*/
@@ -62,10 +64,10 @@ public final class CDCJob extends AbstractSimplePipelineJob
{
protected PipelineJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- InventoryIncrementalJobItemProgress initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
CDCProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
CDCTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
- return new CDCJobItemContext(jobConfig, shardingItem, initProgress,
jobProcessContext, taskConfig, dataSourceManager, importerConnector);
+ return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
importerConnector);
}
protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext pipelineJobItemContext) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index f90bac04580..8a87b8f1497 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -54,7 +54,7 @@ public final class CDCJobPreparer {
* @param jobItemContext job item context
*/
public void prepare(final CDCJobItemContext jobItemContext) {
- if (null == jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem())) {
+ if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
if (jobItemContext.isStopping()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 75fb805a811..b65b5632713 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -55,7 +55,7 @@ public interface GovernanceRepositoryAPI {
* @param shardingItem sharding item
* @return job item progress
*/
- String getJobItemProgress(String jobId, int shardingItem);
+ Optional<String> getJobItemProgress(String jobId, int shardingItem);
/**
* Get latest check job id.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index f459fbcd91e..8e47caca94c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -30,6 +30,7 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -60,7 +61,7 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(PipelineJobConfiguration pipelineJobConfig);
@Override
- InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(String
jobId, int shardingItem);
/**
* Get job infos.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 0508ad0aac8..2628da172ae 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -131,7 +131,7 @@ public interface PipelineJobAPI extends TypedSPI {
* @param shardingItem sharding item
* @return job item progress, may be null
*/
- PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+ Optional<? extends PipelineJobItemProgress> getJobItemProgress(String
jobId, int shardingItem);
/**
* Update job item status.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 0398d914d5a..58b91b400d6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
@@ -96,11 +95,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
- InventoryIncrementalJobItemProgress jobItemProgress =
getJobItemProgress(jobId, each);
- if (null != jobItemProgress) {
- jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
- }
- map.put(each, jobItemProgress);
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, each);
+ jobItemProgress.ifPresent(progress ->
progress.setActive(!jobConfigPOJO.isDisabled()));
+ map.put(each, jobItemProgress.orElse(null));
}, LinkedHashMap::putAll);
}
@@ -157,19 +154,19 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
- String data =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
- return Strings.isNullOrEmpty(data) ? null :
jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(data,
YamlInventoryIncrementalJobItemProgress.class));
+ public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
+ Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
+ return progress.map(s ->
jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(s,
YamlInventoryIncrementalJobItemProgress.class)));
}
@Override
public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- InventoryIncrementalJobItemProgress jobItemProgress =
getJobItemProgress(jobId, shardingItem);
- if (null == jobItemProgress) {
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, shardingItem);
+ if (!jobItemProgress.isPresent()) {
return;
}
- jobItemProgress.setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress)));
+ jobItemProgress.get().setStatus(status);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 37c27a7942e..d28a50a2519 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -61,8 +61,9 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public String getJobItemProgress(final String jobId, final int
shardingItem) {
- return
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
+ public Optional<String> getJobItemProgress(final String jobId, final int
shardingItem) {
+ String text =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
+ return Strings.isNullOrEmpty(text) ? Optional.empty() :
Optional.of(text);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 406a67b37f8..88a11c31209 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -24,11 +24,14 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import java.util.Optional;
+
/**
* Consistency check job.
*/
@@ -38,8 +41,9 @@ public final class ConsistencyCheckJob extends
AbstractSimplePipelineJob {
@Override
public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- ConsistencyCheckJobItemProgress jobItemProgress =
(ConsistencyCheckJobItemProgress)
getJobAPI().getJobItemProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
- return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress);
+ ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
+ return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING,
jobItemProgress.orElse(null));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b4d781ea6ba..463902206e8 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
@@ -30,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContex
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
@@ -96,8 +94,8 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
String parentJobId = param.getJobId();
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
- PipelineJobItemProgress progress =
getJobItemProgress(latestCheckJobId.get(), 0);
- if (null == progress || JobStatus.FINISHED !=
progress.getStatus()) {
+ Optional<ConsistencyCheckJobItemProgress> progress =
getJobItemProgress(latestCheckJobId.get(), 0);
+ if (!progress.isPresent() || JobStatus.FINISHED !=
progress.get().getStatus()) {
log.info("check job already exists and status is not FINISHED,
progress={}", progress);
throw new
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
}
@@ -141,26 +139,26 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
}
@Override
- public ConsistencyCheckJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
- String progress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
- return Strings.isNullOrEmpty(progress) ? null :
swapper.swapToObject(YamlEngine.unmarshal(progress,
YamlConsistencyCheckJobItemProgress.class, true));
+ public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final
String jobId, final int shardingItem) {
+ Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
+ return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s,
YamlConsistencyCheckJobItemProgress.class, true)));
}
@Override
public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- ConsistencyCheckJobItemProgress jobItemProgress =
getJobItemProgress(jobId, shardingItem);
- if (null == jobItemProgress) {
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, shardingItem);
+ if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
return;
}
- jobItemProgress.setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
+ jobItemProgress.get().setStatus(status);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
public void startDisabledJob(final String jobId) {
- PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
- if (null != jobProgress && JobStatus.FINISHED ==
jobProgress.getStatus()) {
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, 0);
+ if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
@@ -223,11 +221,12 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
Optional<String> latestCheckJobId =
PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
- ConsistencyCheckJobItemProgress jobItemProgress =
getJobItemProgress(checkJobId, 0);
+ Optional<ConsistencyCheckJobItemProgress> progressOptional =
getJobItemProgress(checkJobId, 0);
ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
- if (null == jobItemProgress) {
+ if (!progressOptional.isPresent()) {
return result;
}
+ ConsistencyCheckJobItemProgress jobItemProgress =
progressOptional.get();
LocalDateTime checkBeginTime = new
Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
if (null == jobItemProgress.getRecordsCount() || null ==
jobItemProgress.getCheckedRecordsCount()) {
result.setFinishedPercentage(0);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 59d14624d0f..3ac7d7559f5 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,6 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigur
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import java.sql.SQLException;
+import java.util.Optional;
/**
* Migration job.
@@ -56,10 +57,10 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
protected InventoryIncrementalJobItemContext
buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- InventoryIncrementalJobItemProgress initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
- return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress, jobProcessContext, taskConfig, dataSourceManager);
+ return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 1edd4529270..ae007aa6a0c 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -49,6 +49,7 @@ import
org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.Optional;
/**
* Migration job preparer.
@@ -91,7 +92,7 @@ public final class MigrationJobPreparer {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String lockName = "prepare-" + jobConfig.getJobId();
LockContext lockContext =
PipelineContext.getContextManager().getInstanceContext().getLockContext();
- if (null == jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem())) {
+ if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
LockDefinition lockDefinition = new GlobalLockDefinition(lockName);
@@ -99,9 +100,9 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 180000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobConfig.getJobId(), jobItemContext.getShardingItem(),
System.currentTimeMillis() - startTimeMillis);
try {
- InventoryIncrementalJobItemProgress jobItemProgress =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- JobStatus currentStatus = null != jobItemProgress ?
jobItemProgress.getStatus() : null;
- boolean prepareFlag = null == jobItemProgress ||
JobStatus.PREPARING.equals(currentStatus) ||
JobStatus.RUNNING.equals(currentStatus)
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress
= jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ JobStatus currentStatus =
jobItemProgress.map(InventoryIncrementalJobItemProgress::getStatus).orElse(null);
+ boolean prepareFlag = !jobItemProgress.isPresent() ||
JobStatus.PREPARING.equals(currentStatus) ||
JobStatus.RUNNING.equals(currentStatus)
|| JobStatus.PREPARING_FAILURE.equals(currentStatus);
if (prepareFlag) {
jobItemContext.setStatus(JobStatus.PREPARING);
diff --git
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
index 53e2c2c29b0..8617049ab4b 100644
---
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
+++
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
@@ -108,7 +108,7 @@ public final class FixtureCDCJobAPI implements
InventoryIncrementalJobAPI, CDCJo
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
+ public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
return null;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3caab14c0da..a836881ca50 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,12 +29,12 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -48,6 +48,7 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,7 +58,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -75,8 +75,9 @@ public final class GovernanceRepositoryAPIImplTest {
public void assertPersistJobProgress() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue");
- String actual =
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- assertThat(actual, is("testValue"));
+ Optional<String> actual =
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("testValue"));
}
@Test
@@ -94,8 +95,8 @@ public final class GovernanceRepositoryAPIImplTest {
public void assertDeleteJob() {
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/1", "");
governanceRepositoryAPI.deleteJob("1");
- String actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
- assertNull(actual);
+ Optional<String> actual =
governanceRepositoryAPI.getJobItemProgress("1", 0);
+ assertFalse(actual.isPresent());
}
@Test
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
index 80d6ddfa275..331cd7c3487 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
@@ -65,7 +65,6 @@ import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -235,9 +234,9 @@ public final class MigrationJobAPITest {
MigrationJobItemContext jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
- InventoryIncrementalJobItemProgress actual =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- assertNotNull(actual);
- assertThat(actual.getStatus(), is(JobStatus.FINISHED));
+ Optional<InventoryIncrementalJobItemProgress> actual =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
}
@Test