This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 80ea2b529e4 Refactor AbstractSeparablePipelineJob and
AbstractInseparablePipelineJob (#32749)
80ea2b529e4 is described below
commit 80ea2b529e4e7ab27a39008f57373f2ff2ce2851
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 20:52:17 2024 +0800
Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob
(#32749)
* Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob
* Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob
---
.../core/job/AbstractInseparablePipelineJob.java | 20 ++++++----------
.../core/job/AbstractSeparablePipelineJob.java | 28 ++++++++--------------
.../pipeline/core/job/type/PipelineJobType.java | 7 ++++++
.../pipeline/core/job/type/FixtureJobType.java | 5 ++++
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 4 ++--
.../data/pipeline/cdc/CDCJobType.java | 5 ++++
.../data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 5 ++--
.../consistencycheck/ConsistencyCheckJobType.java | 5 ++++
...tencyCheckJobConfigurationChangedProcessor.java | 2 +-
.../pipeline/scenario/migration/MigrationJob.java | 5 ++--
.../scenario/migration/MigrationJobType.java | 5 ++++
.../MigrationJobConfigurationChangedProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
14 files changed, 56 insertions(+), 41 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index b6d4c28eac5..bcbcca15ede 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -59,19 +59,6 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
private final PipelineJobRunnerManager jobRunnerManager;
- private final TransmissionProcessContext jobProcessContext;
-
- protected AbstractInseparablePipelineJob(final String jobId, final
PipelineJobRunnerManager jobRunnerManager) {
- this.jobRunnerManager = jobRunnerManager;
- jobProcessContext = createTransmissionProcessContext(jobId);
- }
-
- private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
- new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
- return new TransmissionProcessContext(jobId, processConfig);
- }
-
@SuppressWarnings("unchecked")
@Override
public final void execute(final ShardingContext shardingContext) {
@@ -79,6 +66,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
log.info("Execute job {}", jobId);
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ TransmissionProcessContext jobProcessContext =
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
Collection<I> jobItemContexts = new LinkedList<>();
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
@@ -105,6 +93,12 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
executeIncrementalTasks(jobItemContexts, jobItemManager);
}
+ private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
+ new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ return new TransmissionProcessContext(jobId, processConfig);
+ }
+
protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress, TransmissionProcessContext jobProcessContext);
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 5afdc66cb8b..c77303c1feb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -46,29 +47,13 @@ import java.sql.SQLException;
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
*/
+@RequiredArgsConstructor
+@Getter
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
- @Getter
private final PipelineJobRunnerManager jobRunnerManager;
- private final TransmissionProcessContext jobProcessContext;
-
- protected AbstractSeparablePipelineJob(final String jobId) {
- this(jobId, true);
- }
-
- protected AbstractSeparablePipelineJob(final String jobId, final boolean
isTransmissionProcessContextNeeded) {
- jobRunnerManager = new PipelineJobRunnerManager();
- jobProcessContext = isTransmissionProcessContextNeeded ?
createTransmissionProcessContext(jobId) : null;
- }
-
- private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
- new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
- return new TransmissionProcessContext(jobId, processConfig);
- }
-
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
@@ -81,6 +66,7 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(jobType);
T jobConfig = jobConfigManager.getJobConfiguration(jobId);
+ TransmissionProcessContext jobProcessContext =
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
boolean started = false;
@@ -118,6 +104,12 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
return true;
}
+ private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
+ new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ return new TransmissionProcessContext(jobId, processConfig);
+ }
+
protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress, TransmissionProcessContext jobProcessContext);
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index a217f0ec2b4..aa9886233f6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -48,6 +48,13 @@ public interface PipelineJobType extends TypedSPI {
*/
String getCode();
+ /**
+ * Is transmission job.
+ *
+ * @return is transmission job or not
+ */
+ boolean isTransmissionJob();
+
/**
* Get YAML pipeline job configuration swapper.
*
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
index 2fe7575ae48..c6988111e34 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
@@ -36,6 +36,11 @@ public final class FixtureJobType implements PipelineJobType
{
return "00";
}
+ @Override
+ public boolean isTransmissionJob() {
+ return true;
+ }
+
@Override
public <Y extends YamlConfiguration, T extends PipelineJobConfiguration>
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper() {
return null;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index c210fffc0e0..ad69003cd6a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -75,8 +75,8 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
@Getter
private final PipelineSink sink;
- public CDCJob(final String jobId, final PipelineSink sink) {
- super(jobId, new PipelineJobRunnerManager(new
CDCJobRunnerCleaner(sink)));
+ public CDCJob(final PipelineSink sink) {
+ super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
this.sink = sink;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 705b5ee90b5..be6e1d76225 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -36,6 +36,11 @@ public final class CDCJobType implements PipelineJobType {
return "03";
}
+ @Override
+ public boolean isTransmissionJob() {
+ return true;
+ }
+
@SuppressWarnings("unchecked")
@Override
public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 805a15c1666..2e9d6187cf8 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -223,7 +223,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
* @param sink sink
*/
public void start(final String jobId, final PipelineSink sink) {
- CDCJob job = new CDCJob(jobId, sink);
+ CDCJob job = new CDCJob(sink);
PipelineJobRegistry.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 24639b29bde..2c73aef56b7 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -31,8 +32,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.Co
*/
public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration,
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
- public ConsistencyCheckJob(final String jobId) {
- super(jobId, false);
+ public ConsistencyCheckJob() {
+ super(new PipelineJobRunnerManager());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 1a5102018c6..2bfc557d23d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -32,6 +32,11 @@ public final class ConsistencyCheckJobType implements
PipelineJobType {
return "02";
}
+ @Override
+ public boolean isTransmissionJob() {
+ return false;
+ }
+
@SuppressWarnings("unchecked")
@Override
public YamlConsistencyCheckJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 1a3d470f8b1..4afb66b446a 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -29,7 +29,7 @@ public final class
ConsistencyCheckJobConfigurationChangedProcessor implements J
@Override
public PipelineJob createJob(final ConsistencyCheckJobConfiguration
jobConfig) {
- return new ConsistencyCheckJob(jobConfig.getJobId());
+ return new ConsistencyCheckJob();
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 6ae23369c6c..09e140a72ef 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfigurati
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
@@ -55,8 +56,8 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
- public MigrationJob(final String jobId) {
- super(jobId);
+ public MigrationJob() {
+ super(new PipelineJobRunnerManager());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index cf4ba6b392e..b6576d42735 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -46,6 +46,11 @@ public final class MigrationJobType implements
PipelineJobType {
return "01";
}
+ @Override
+ public boolean isTransmissionJob() {
+ return true;
+ }
+
@SuppressWarnings("unchecked")
@Override
public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index af1cb8fb894..39545ca0355 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -32,7 +32,7 @@ public final class MigrationJobConfigurationChangedProcessor
implements JobConfi
@Override
public PipelineJob createJob(final MigrationJobConfiguration jobConfig) {
- return new MigrationJob(jobConfig.getJobId());
+ return new MigrationJob();
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 19de73a4f19..16468324e59 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -59,7 +59,7 @@ class ConsistencyCheckJobTest {
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
- ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob("j02");
+ ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(), 0);