This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fff648e94a1 Refactor PipelineJobConfigurationManager (#37109)
fff648e94a1 is described below
commit fff648e94a14c5d6770f64ae13899b8f4f4d6bd8
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 13:00:39 2025 +0800
Refactor PipelineJobConfigurationManager (#37109)
---
.../core/job/service/PipelineJobConfigurationManager.java | 10 +++++-----
.../data/pipeline/core/job/service/PipelineJobManager.java | 2 +-
.../data/pipeline/core/job/service/TransmissionJobManager.java | 2 +-
.../apache/shardingsphere/data/pipeline/cdc/CDCJobType.java | 2 +-
.../apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 4 ++--
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 2 +-
.../scenario/consistencycheck/api/ConsistencyCheckJobAPI.java | 2 +-
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 2 +-
.../data/pipeline/scenario/migration/MigrationJobType.java | 2 +-
.../data/pipeline/scenario/migration/api/MigrationJobAPI.java | 4 ++--
.../distsql/handler/update/CheckMigrationJobExecutor.java | 2 +-
.../consistencycheck/api/ConsistencyCheckJobAPITest.java | 4 ++--
.../pipeline/scenario/migration/api/MigrationJobAPITest.java | 2 +-
13 files changed, 20 insertions(+), 20 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index f9af4cc3f73..92d5deda4c5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.ShardingTotalCountUs
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
@@ -37,7 +37,7 @@ import java.util.Collections;
@RequiredArgsConstructor
public final class PipelineJobConfigurationManager {
- private final PipelineJobType jobType;
+ private final PipelineJobOption jobOption;
/**
* Get job configuration.
@@ -48,7 +48,7 @@ public final class PipelineJobConfigurationManager {
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final
String jobId) {
- return (T)
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+ return (T)
jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}
/**
@@ -61,8 +61,8 @@ public final class PipelineJobConfigurationManager {
public JobConfigurationPOJO convertToJobConfigurationPOJO(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
-
result.setShardingTotalCount(jobType.getOption().isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
- YamlPipelineJobConfigurationSwapper swapper =
jobType.getOption().getYamlJobConfigurationSwapper();
+
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
+ YamlPipelineJobConfigurationSwapper swapper =
jobOption.getYamlJobConfigurationSwapper();
result.setJobParameter(YamlEngine.marshal(swapper.swapToYamlConfiguration(jobConfig)));
String createTimeFormat =
LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index dcd238cc377..4738540975c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -69,7 +69,7 @@ public final class PipelineJobManager {
return;
}
governanceFacade.getJobFacade().getJob().create(jobId,
jobType.getOption().getJobClass());
- governanceFacade.getJobFacade().getConfiguration().persist(jobId, new
PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
+ governanceFacade.getJobFacade().getConfiguration().persist(jobId, new
PipelineJobConfigurationManager(jobType.getOption()).convertToJobConfigurationPOJO(jobConfig));
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index f47efd748e6..d9f8277573a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -53,7 +53,7 @@ public final class TransmissionJobManager {
* @return job item infos
*/
public Collection<TransmissionJobItemInfo> getJobItemInfos(final String
jobId) {
- PipelineJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
+ PipelineJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<TransmissionJobItemInfo> result = new LinkedList<>();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index a0264488739..78782ff27e3 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -38,7 +38,7 @@ public final class CDCJobType implements PipelineJobType {
@Override
public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(new
CDCJobType()).getJobConfiguration(jobMetaData.getJobId());
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(getOption()).getJobConfiguration(jobMetaData.getJobId());
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 1eb93f50767..72b54107780 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -110,7 +110,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
public CDCJobAPI() {
jobType = new CDCJobType();
jobManager = new PipelineJobManager(jobType);
- jobConfigManager = new PipelineJobConfigurationManager(jobType);
+ jobConfigManager = new
PipelineJobConfigurationManager(jobType.getOption());
dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
@@ -285,7 +285,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
* @return job item infos
*/
public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
Collection<CDCJobItemInfo> result = new LinkedList<>();
for (TransmissionJobItemInfo each : new
TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index dac670c5455..e129229e4b7 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -75,7 +75,7 @@ public final class CDCBackendHandler {
private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(new CDCJobType());
+ private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(new CDCJobType().getOption());
/**
* Get database name by job ID.
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 0d99f34600f..859db4bedcc 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -70,7 +70,7 @@ public final class ConsistencyCheckJobAPI {
public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) {
progressSwapper = (YamlConsistencyCheckJobItemProgressSwapper)
jobType.getOption().getYamlJobItemProgressSwapper();
jobManager = new PipelineJobManager(jobType);
- jobConfigManager = new PipelineJobConfigurationManager(jobType);
+ jobConfigManager = new
PipelineJobConfigurationManager(jobType.getOption());
jobItemManager = new PipelineJobItemManager<>(progressSwapper);
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 50b84d63eca..4d7951d4b66 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -107,7 +107,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
protected void runBlocking() {
jobItemManager.persistProgress(jobItemContext);
PipelineJobType jobType =
PipelineJobIdUtils.parseJobType(parentJobId);
- PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId);
+ PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(parentJobId);
try {
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()),
jobType.getType()));
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index da4abe3186e..6005b214b6e 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -48,7 +48,7 @@ public final class MigrationJobType implements
PipelineJobType {
@Override
public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
Collection<String> sourceTables = new LinkedList<>();
- new PipelineJobConfigurationManager(new
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobMetaData.getJobId()).getJobShardingDataNodes()
+ new PipelineJobConfigurationManager(new
MigrationJobType().getOption()).<MigrationJobConfiguration>getJobConfiguration(jobMetaData.getJobId()).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode -> sourceTables.add(dataNode.format()))));
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 7745861d7a7..f08ab2bdf62 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -99,7 +99,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
public MigrationJobAPI() {
PipelineJobType jobType = new MigrationJobType();
jobManager = new PipelineJobManager(jobType);
- jobConfigManager = new PipelineJobConfigurationManager(jobType);
+ jobConfigManager = new
PipelineJobConfigurationManager(jobType.getOption());
dataSourcePersistService = new PipelineDataSourcePersistService();
}
@@ -326,7 +326,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
getType())).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).getJobConfiguration(jobId);
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
index 4d508787adc..26575f31a8e 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
@@ -51,7 +51,7 @@ public final class CheckMigrationJobExecutor implements
DistSQLUpdateExecutor<Ch
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(migrationJobType).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(migrationJobType.getOption()).getJobConfiguration(jobId);
verifyInventoryFinished(jobConfig);
checkJobAPI.start(new CreateConsistencyCheckJobParameter(jobId,
algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(),
jobConfig.getTargetDatabaseType()));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index 0cab811477d..aa9ee65bc18 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -74,7 +74,7 @@ class ConsistencyCheckJobAPITest {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.start(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = PipelineJobIdUtils.marshal(new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence));
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
@@ -240,7 +240,7 @@ class ConsistencyCheckJobAPITest {
}
private void persistCheckJobProgress(final ConsistencyCheckJobItemProgress
checkJobItemProgress, final String checkJobId, final JobStatus jobStatus, final
int recordCount) {
- ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(checkJobId);
ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(checkJobConfig, 0, jobStatus,
checkJobItemProgress);
LocalDateTime checkBeginTime = new
Timestamp(checkJobItemContext.getProgressContext().getCheckBeginTimeMillis()).toLocalDateTime();
checkJobItemContext.getProgressContext().setRecordsCount(recordCount);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index aa807dbb132..ac760942004 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -113,7 +113,7 @@ class MigrationJobAPITest {
PipelineContextUtils.initPipelineContextManager();
jobType = new MigrationJobType();
jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
- jobConfigManager = new PipelineJobConfigurationManager(jobType);
+ jobConfigManager = new
PipelineJobConfigurationManager(jobType.getOption());
jobManager = new PipelineJobManager(jobType);
transmissionJobManager = new TransmissionJobManager(jobType);
jobItemManager = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());