This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new acd872e114c Merge PipelineJobType and PipelineJobOption (#29270)
acd872e114c is described below
commit acd872e114c7c3202d644b04ae28e0605c02e488
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 3 22:14:16 2023 +0800
Merge PipelineJobType and PipelineJobOption (#29270)
* Merge PipelineJobType and PipelineJobOption
* Merge PipelineJobType and PipelineJobOption
---
.../pipeline/common/job/type/PipelineJobType.java | 91 ++++++++++++++-
.../pipeline/core/job/AbstractPipelineJob.java | 5 +-
.../core/job/AbstractSimplePipelineJob.java | 2 +-
.../core/job/option/PipelineJobOption.java | 125 ---------------------
.../persist/PipelineJobProgressPersistService.java | 4 +-
.../service/PipelineJobConfigurationManager.java | 10 +-
.../core/job/service/PipelineJobManager.java | 23 ++--
.../core/job/service/TransmissionJobManager.java | 10 +-
.../core/task/runner/TransmissionTasksRunner.java | 11 +-
.../pipeline/common/job/type/FixtureJobType.java | 39 ++++++-
.../query/ShowStreamingJobStatusExecutor.java | 4 +-
.../handler/query/ShowStreamingListExecutor.java | 4 +-
.../query/ShowMigrationCheckStatusExecutor.java | 4 +-
.../query/ShowMigrationJobStatusExecutor.java | 4 +-
.../handler/query/ShowMigrationListExecutor.java | 4 +-
.../handler/update/CheckMigrationJobUpdater.java | 13 ++-
.../handler/update/DropMigrationCheckUpdater.java | 4 +-
.../handler/update/StartMigrationCheckUpdater.java | 4 +-
.../handler/update/StartMigrationUpdater.java | 4 +-
.../handler/update/StopMigrationCheckUpdater.java | 4 +-
.../handler/update/StopMigrationUpdater.java | 6 +-
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 7 +-
.../data/pipeline/cdc/CDCJobOption.java | 79 -------------
.../data/pipeline/cdc/CDCJobType.java | 46 +++++++-
.../data/pipeline/cdc/api/CDCJobAPI.java | 16 +--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 6 +-
.../pipeline/cdc/handler/CDCBackendHandler.java | 8 +-
.../consistencycheck/ConsistencyCheckJob.java | 2 +-
.../ConsistencyCheckJobOption.java | 71 ------------
.../consistencycheck/ConsistencyCheckJobType.java | 40 ++++++-
.../api/ConsistencyCheckJobAPI.java | 10 +-
.../task/ConsistencyCheckTasksRunner.java | 16 ++-
.../pipeline/scenario/migration/MigrationJob.java | 6 +-
.../scenario/migration/MigrationJobOption.java | 92 ---------------
.../scenario/migration/MigrationJobType.java | 59 +++++++++-
.../scenario/migration/api/MigrationJobAPI.java | 11 +-
.../MigrationDataConsistencyChecker.java | 4 +-
.../migration/prepare/MigrationJobPreparer.java | 8 +-
.../updatable/AlterTransmissionRuleUpdater.java | 2 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 10 +-
.../migration/api/impl/MigrationJobAPITest.java | 21 ++--
41 files changed, 376 insertions(+), 513 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
index 4781e9c4552..77689ff3f37 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
@@ -17,9 +17,21 @@
package org.apache.shardingsphere.data.pipeline.common.job.type;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+import java.util.Optional;
/**
* Pipeline job type.
@@ -35,11 +47,82 @@ public interface PipelineJobType extends TypedSPI {
String getCode();
/**
- * Get job option.
+ * Get YAML pipeline job configuration swapper.
+ *
+ * @param <T> type of YAML configuration
+ * @param <Y> type of pipeline job configuration
+ * @return YAML pipeline job configuration swapper
+ */
+ <Y extends YamlConfiguration, T extends PipelineJobConfiguration>
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();
+
+ /**
+ * Get YAML pipeline job item progress swapper.
+ *
+ * @param <T> type of pipeline job item progress
+ * @return YAML pipeline job item progress swapper
+ */
+ <T extends PipelineJobItemProgress>
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
getYamlJobItemProgressSwapper();
+
+ /**
+ * Get pipeline job class.
+ *
+ * @return pipeline job class
+ */
+ Class<? extends PipelineJob> getJobClass();
+
+ /**
+ * Whether to ignore to start disabled job when job item progress is
finished.
+ *
+ * @return ignore to start disabled job when job item progress is finished
or not
+ */
+ default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+ return false;
+ }
+
+ /**
+ * Get to be start disabled next job type.
+ *
+ * @return to be start disabled next job type
+ */
+ default Optional<String> getToBeStartDisabledNextJobType() {
+ return Optional.empty();
+ }
+
+ /**
+ * Get to be stopped previous job type.
+ *
+ * @return to be stopped previous job type
+ */
+ default Optional<String> getToBeStoppedPreviousJobType() {
+ return Optional.empty();
+ }
+
+ /**
+ * Whether to force no sharding when convert to job configuration POJO.
+ *
+ * @return without sharding or not
+ */
+ default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+ return false;
+ }
+
+ /**
+ * Get pipeline job info.
+ *
+ * @param jobId job ID
+ * @return pipeline job info
+ */
+ PipelineJobInfo getJobInfo(String jobId);
+
+ /**
+ * Build pipeline data consistency checker.
*
- * @return job option
+ * @param jobConfig job configuration
+ * @param processContext process context
+ * @param progressContext consistency check job item progress context
+ * @return all logic tables check result
*/
- PipelineJobOption getOption();
+ PipelineDataConsistencyChecker
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig,
TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
@Override
String getType();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 8cf5e7ea051..95b83c64372 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJo
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
@@ -56,7 +55,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private final String jobId;
@Getter(AccessLevel.PROTECTED)
- private final PipelineJobOption jobOption;
+ private final PipelineJobType jobType;
private final AtomicBoolean stopping = new AtomicBoolean(false);
@@ -66,7 +65,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
- jobOption = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
+ jobType = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType());
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 41ed6e7f77e..92a5329850d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -61,7 +61,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- processFailed(new PipelineJobManager(getJobOption()), jobId,
shardingItem, ex);
+ processFailed(new PipelineJobManager(getJobType()), jobId,
shardingItem, ex);
throw ex;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
deleted file mode 100644
index 6b3d4b71e19..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.option;
-
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-
-import java.util.Optional;
-
-/**
- * Pipeline job option.
- */
-@SingletonSPI
-public interface PipelineJobOption {
-
- /**
- * Get YAML pipeline job configuration swapper.
- *
- * @param <T> type of YAML configuration
- * @param <Y> type of pipeline job configuration
- * @return YAML pipeline job configuration swapper
- */
- <Y extends YamlConfiguration, T extends PipelineJobConfiguration>
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();
-
- /**
- * Get YAML pipeline job item progress swapper.
- *
- * @param <T> type of pipeline job item progress
- * @return YAML pipeline job item progress swapper
- */
- <T extends PipelineJobItemProgress>
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
getYamlJobItemProgressSwapper();
-
- /**
- * Get pipeline job class.
- *
- * @return pipeline job class
- */
- Class<? extends PipelineJob> getJobClass();
-
- /**
- * Whether to ignore to start disabled job when job item progress is
finished.
- *
- * @return ignore to start disabled job when job item progress is finished
or not
- */
- default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
- return false;
- }
-
- /**
- * Get to be start disabled next job type.
- *
- * @return to be start disabled next job type
- */
- default Optional<String> getToBeStartDisabledNextJobType() {
- return Optional.empty();
- }
-
- /**
- * Get to be stopped previous job type.
- *
- * @return to be stopped previous job type
- */
- default Optional<String> getToBeStoppedPreviousJobType() {
- return Optional.empty();
- }
-
- /**
- * Whether to force no sharding when convert to job configuration POJO.
- *
- * @return without sharding or not
- */
- default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
- return false;
- }
-
- /**
- * Get pipeline job info.
- *
- * @param jobId job ID
- * @return pipeline job info
- */
- PipelineJobInfo getJobInfo(String jobId);
-
- /**
- * Build pipeline data consistency checker.
- *
- * @param jobConfig job configuration
- * @param processContext process context
- * @param progressContext consistency check job item progress context
- * @return all logic tables check result
- */
- PipelineDataConsistencyChecker
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig,
TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
-
- /**
- * Get job type.
- *
- * @return job type
- */
- String getType();
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 26c95849c91..45c7afc6755 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -130,8 +130,8 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
-
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
+
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index a6c49fcb965..5d4b8878308 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -19,9 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.core.job.service;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -35,7 +35,7 @@ import java.util.Collections;
@RequiredArgsConstructor
public final class PipelineJobConfigurationManager {
- private final PipelineJobOption jobOption;
+ private final PipelineJobType jobType;
/**
* Get job configuration.
@@ -46,7 +46,7 @@ public final class PipelineJobConfigurationManager {
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final
String jobId) {
- return (T)
jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+ return (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}
/**
@@ -58,8 +58,8 @@ public final class PipelineJobConfigurationManager {
public JobConfigurationPOJO convertToJobConfigurationPOJO(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
-
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
-
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+
result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
+
result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat =
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index a00564b5bbc..2d98b49be6a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCre
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -52,7 +51,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class PipelineJobManager {
- private final PipelineJobOption jobOption;
+ private final PipelineJobType jobType;
/**
* Start job.
@@ -67,8 +66,8 @@ public final class PipelineJobManager {
log.warn("jobId already exists in registry center, ignore, job id
is `{}`", jobId);
return;
}
- governanceFacade.getJobFacade().getJob().create(jobId,
jobOption.getJobClass());
- governanceFacade.getJobFacade().getConfiguration().persist(jobId, new
PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig));
+ governanceFacade.getJobFacade().getJob().create(jobId,
jobType.getJobClass());
+ governanceFacade.getJobFacade().getConfiguration().persist(jobId, new
PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
}
/**
@@ -77,15 +76,15 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void resume(final String jobId) {
- if
(jobOption.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
- Optional<? extends PipelineJobItemProgress> jobItemProgress = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()).getProgress(jobId,
0);
+ if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished())
{
+ Optional<? extends PipelineJobItemProgress> jobItemProgress = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId,
0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
}
startCurrentDisabledJob(jobId);
- jobOption.getToBeStartDisabledNextJobType().ifPresent(optional ->
startNextDisabledJob(jobId, optional));
+ jobType.getToBeStartDisabledNextJobType().ifPresent(optional ->
startNextDisabledJob(jobId, optional));
}
@@ -108,7 +107,7 @@ public final class PipelineJobManager {
private void startNextDisabledJob(final String jobId, final String
toBeStartDisabledNextJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
toBeStartDisabledNextJobType).getOption()).resume(optional);
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
toBeStartDisabledNextJobType)).resume(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -123,14 +122,14 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void stop(final String jobId) {
- jobOption.getToBeStoppedPreviousJobType().ifPresent(optional ->
stopPreviousJob(jobId, optional));
+ jobType.getToBeStoppedPreviousJobType().ifPresent(optional ->
stopPreviousJob(jobId, optional));
stopCurrentJob(jobId);
}
private void stopPreviousJob(final String jobId, final String
toBeStoppedPreviousJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
toBeStoppedPreviousJobType).getOption()).stop(optional);
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
toBeStoppedPreviousJobType)).stop(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -175,8 +174,8 @@ public final class PipelineJobManager {
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey
contextKey) {
try {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
- .filter(each -> !each.getJobName().startsWith("_") &&
jobOption.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
- .map(each ->
jobOption.getJobInfo(each.getJobName())).collect(Collectors.toList());
+ .filter(each -> !each.getJobName().startsWith("_") &&
jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
+ .map(each ->
jobType.getJobInfo(each.getJobName())).collect(Collectors.toList());
} catch (final UnsupportedOperationException ex) {
return Collections.emptyList();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 842a84f2a6e..0be041dafd0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -21,11 +21,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import java.util.Collection;
@@ -43,7 +43,7 @@ import java.util.stream.IntStream;
@RequiredArgsConstructor
public final class TransmissionJobManager {
- private final PipelineJobOption jobOption;
+ private final PipelineJobType jobType;
/**
* Get job infos.
@@ -52,11 +52,11 @@ public final class TransmissionJobManager {
* @return job item infos
*/
public Collection<TransmissionJobItemInfo> getJobItemInfos(final String
jobId) {
- PipelineJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
+ PipelineJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<TransmissionJobItemInfo> result = new LinkedList<>();
- PipelineJobInfo jobInfo = jobOption.getJobInfo(jobId);
+ PipelineJobInfo jobInfo = jobType.getJobInfo(jobId);
for (Entry<Integer, TransmissionJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TransmissionJobItemProgress jobItemProgress = entry.getValue();
@@ -88,7 +88,7 @@ public final class TransmissionJobManager {
* @return each sharding item progress
*/
public Map<Integer, TransmissionJobItemProgress> getJobProgress(final
PipelineJobConfiguration jobConfig) {
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index bd4fca50425..30b6dd5351c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJ
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
@@ -57,7 +56,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
private final Collection<PipelineTask> incrementalTasks;
- private final PipelineJobOption jobOption;
+ private final PipelineJobType jobType;
private final PipelineJobManager jobManager;
@@ -67,9 +66,9 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
- jobOption = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption();
- jobManager = new PipelineJobManager(jobOption);
- jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ jobType = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
+ jobManager = new PipelineJobManager(jobType);
+ jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
}
@Override
@@ -90,7 +89,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
index d4de294616f..f2d143a71f6 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
@@ -17,9 +17,17 @@
package org.apache.shardingsphere.data.pipeline.common.job.type;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-
-import static org.mockito.Mockito.mock;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
* Fixture job type.
@@ -32,8 +40,29 @@ public final class FixtureJobType implements PipelineJobType
{
}
@Override
- public PipelineJobOption getOption() {
- return mock(PipelineJobOption.class);
+ public <Y extends YamlConfiguration, T extends PipelineJobConfiguration>
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper() {
+ return null;
+ }
+
+ @Override
+ public <T extends PipelineJobItemProgress>
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
getYamlJobItemProgressSwapper() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends PipelineJob> getJobClass() {
+ return null;
+ }
+
+ @Override
+ public PipelineJobInfo getJobInfo(final String jobId) {
+ return null;
+ }
+
+ @Override
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig,
+ final
TransmissionProcessContext processContext, final
ConsistencyCheckJobItemProgressContext progressContext) {
+ return null;
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 7bb2e0f4348..223d8c5a784 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
@@ -37,7 +37,7 @@ public final class ShowStreamingJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingStatusStatement sqlStatement) {
- Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(new
CDCJobOption()).getJobItemInfos(sqlStatement.getJobId());
+ Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(new
CDCJobType()).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> getRow(each,
currentTimeMillis)).collect(Collectors.toList());
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 63c52219df3..c69d02d4678 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
*/
public final class ShowStreamingListExecutor implements
QueryableRALExecutor<ShowStreamingListStatement> {
- private final PipelineJobManager pipelineJobManager = new
PipelineJobManager(new CDCJobOption());
+ private final PipelineJobManager pipelineJobManager = new
PipelineJobManager(new CDCJobType());
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingListStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 4ad92f6161e..b63898a8033 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -18,8 +18,8 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
@@ -35,7 +35,7 @@ import java.util.Optional;
*/
public final class ShowMigrationCheckStatusExecutor implements
QueryableRALExecutor<ShowMigrationCheckStatusStatement> {
- private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+ private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationCheckStatusStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index 2c518b1e648..38f2bcdf217 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -37,7 +37,7 @@ public final class ShowMigrationJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationStatusStatement sqlStatement) {
- Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(new
MigrationJobOption()).getJobItemInfos(sqlStatement.getJobId());
+ Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(new
MigrationJobType()).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> getRow(each,
currentTimeMillis)).collect(Collectors.toList());
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 156dae257b0..a3e9f8cffff 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
*/
public final class ShowMigrationListExecutor implements
QueryableRALExecutor<ShowMigrationListStatement> {
- private final PipelineJobManager pipelineJobManager = new
PipelineJobManager(new MigrationJobOption());
+ private final PipelineJobManager pipelineJobManager = new
PipelineJobManager(new MigrationJobType());
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationListStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 5b669b4ba18..cc0b69e236f 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -17,14 +17,15 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.CreateConsistencyCheckJobParameter;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
@@ -39,9 +40,9 @@ import java.util.Properties;
*/
public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigrationStatement> {
- private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+ private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
- private final MigrationJobOption migrationJobOption = new
MigrationJobOption();
+ private final PipelineJobType migrationJobType = new MigrationJobType();
@Override
public void executeUpdate(final String databaseName, final
CheckMigrationStatement sqlStatement) throws SQLException {
@@ -49,13 +50,13 @@ public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigration
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(migrationJobOption).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(migrationJobType).getJobConfiguration(jobId);
verifyInventoryFinished(jobConfig);
checkJobAPI.start(new CreateConsistencyCheckJobParameter(jobId,
algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(),
jobConfig.getTargetDatabaseType()));
}
private void verifyInventoryFinished(final MigrationJobConfiguration
jobConfig) {
- TransmissionJobManager transmissionJobManager = new
TransmissionJobManager(migrationJobOption);
+ TransmissionJobManager transmissionJobManager = new
TransmissionJobManager(migrationJobType);
ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(),
transmissionJobManager.getJobProgress(jobConfig).values()),
() -> new PipelineInvalidParameterException("Inventory is not
finished."));
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
index 8b820291d38..da0690d1538 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
@@ -17,8 +17,8 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckStatement;
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckS
*/
public final class DropMigrationCheckUpdater implements
RALUpdater<DropMigrationCheckStatement> {
- private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+ private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
@Override
public void executeUpdate(final String databaseName, final
DropMigrationCheckStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
index b34f4274b33..189eec12237 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
@@ -17,8 +17,8 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement;
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheck
*/
public final class StartMigrationCheckUpdater implements
RALUpdater<StartMigrationCheckStatement> {
- private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+ private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
@Override
public void executeUpdate(final String databaseName, final
StartMigrationCheckStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
index 209b62587f3..816397e0c52 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationState
*/
public final class StartMigrationUpdater implements
RALUpdater<StartMigrationStatement> {
- private final PipelineJobManager jobManager = new PipelineJobManager(new
MigrationJobOption());
+ private final PipelineJobManager jobManager = new PipelineJobManager(new
MigrationJobType());
@Override
public void executeUpdate(final String databaseName, final
StartMigrationStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
index c809ecb2743..e3059524082 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
@@ -17,8 +17,8 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement;
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckS
*/
public final class StopMigrationCheckUpdater implements
RALUpdater<StopMigrationCheckStatement> {
- private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+ private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
@Override
public void executeUpdate(final String databaseName, final
StopMigrationCheckStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
index fa11b738eb4..8dea2425a2c 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
@@ -27,9 +27,7 @@ import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatem
*/
public final class StopMigrationUpdater implements
RALUpdater<StopMigrationStatement> {
- private final MigrationJobOption jobOption = new MigrationJobOption();
-
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobOption);
+ private final PipelineJobManager jobManager = new PipelineJobManager(new
MigrationJobType());
@Override
public void executeUpdate(final String databaseName, final
StopMigrationStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 5da8e693eaa..3cf6df2f069 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -46,6 +46,7 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
@@ -85,11 +86,11 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
@Getter
private final PipelineSink sink;
- private final CDCJobOption jobOption = new CDCJobOption();
+ private final PipelineJobType jobType =
TypedSPILoader.getService(PipelineJobType.class, "STREAMING");
private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
@@ -134,7 +135,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobOption.getType()));
+
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobType.getType()));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
deleted file mode 100644
index d2e6854e755..00000000000
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc;
-
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-
-/**
- * CDC job option.
- */
-@Slf4j
-public final class CDCJobOption implements PipelineJobOption {
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
- return new YamlCDCJobConfigurationSwapper();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
- return new YamlTransmissionJobItemProgressSwapper();
- }
-
- @Override
- public Class<CDCJob> getJobClass() {
- return CDCJob.class;
- }
-
- @Override
- public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
- return true;
- }
-
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
- return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
- }
-
- @Override
- public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
- final
ConsistencyCheckJobItemProgressContext progressContext) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getType() {
- return "STREAMING";
- }
-}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 654f878dca2..509f24e6c20 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -17,8 +17,18 @@
package org.apache.shardingsphere.data.pipeline.cdc;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
/**
* CDC job type.
@@ -30,9 +40,39 @@ public final class CDCJobType implements PipelineJobType {
return "03";
}
+ @SuppressWarnings("unchecked")
@Override
- public PipelineJobOption getOption() {
- return new CDCJobOption();
+ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
+ return new YamlCDCJobConfigurationSwapper();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlTransmissionJobItemProgressSwapper();
+ }
+
+ @Override
+ public Class<CDCJob> getJobClass() {
+ return CDCJob.class;
+ }
+
+ @Override
+ public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+ return true;
+ }
+
+ @Override
+ public PipelineJobInfo getJobInfo(final String jobId) {
+ PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(new CDCJobType()).getJobConfiguration(jobId);
+ return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
+ }
+
+ @Override
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
+ final
ConsistencyCheckJobItemProgressContext progressContext) {
+ throw new UnsupportedOperationException();
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 71c58442b9b..d24cbcc1612 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDa
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration;
@@ -83,7 +83,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class CDCJobAPI implements TransmissionJobAPI {
- private final CDCJobOption jobOption;
+ private final CDCJobType jobType;
private final PipelineJobManager jobManager;
@@ -96,9 +96,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
private final YamlPipelineDataSourceConfigurationSwapper
pipelineDataSourceConfigSwapper;
public CDCJobAPI() {
- jobOption = new CDCJobOption();
- jobManager = new PipelineJobManager(jobOption);
- jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+ jobType = new CDCJobType();
+ jobManager = new PipelineJobManager(jobType);
+ jobConfigManager = new PipelineJobConfigurationManager(jobType);
dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
@@ -121,7 +121,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
if
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
{
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
-
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
jobOption.getJobClass());
+
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
jobType.getJobClass());
JobConfigurationPOJO jobConfigPOJO =
jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
@@ -169,7 +169,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
try (PipelineDataSourceManager pipelineDataSourceManager = new
DefaultPipelineDataSourceManager()) {
for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
if (jobItemManager.getProgress(jobId, i).isPresent()) {
@@ -178,7 +178,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
TransmissionJobItemProgress jobItemProgress =
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
- jobId, i,
YamlEngine.marshal(jobOption.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ jobId, i,
YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 4cb7e57ea51..a18836b0793 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
@@ -67,9 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class CDCJobPreparer {
- private final CDCJobOption jobAPI = new CDCJobOption();
-
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
CDCJobType().getYamlJobItemProgressSwapper());
/**
* Do prepare work.
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 04ab6e3cb52..6270c145d80 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -21,8 +21,9 @@ import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
@@ -31,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
@@ -48,8 +48,8 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -75,7 +75,7 @@ public final class CDCBackendHandler {
private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(new CDCJobOption());
+ private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(new CDCJobType());
/**
* Get database name by job ID.
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index f3056e72746..3382e326900 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -45,7 +45,7 @@ public final class ConsistencyCheckJob extends
AbstractSimplePipelineJob {
@Override
public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getJobOption().getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING,
jobItemProgress.orElse(null));
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
deleted file mode 100644
index d5a1caaf312..00000000000
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
-
-/**
- * Consistency check job option.
- */
-public final class ConsistencyCheckJobOption implements PipelineJobOption {
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlConsistencyCheckJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
- return new YamlConsistencyCheckJobConfigurationSwapper();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlConsistencyCheckJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
- return new YamlConsistencyCheckJobItemProgressSwapper();
- }
-
- @Override
- public Class<ConsistencyCheckJob> getJobClass() {
- return ConsistencyCheckJob.class;
- }
-
- @Override
- public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
- return true;
- }
-
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig,
- final
TransmissionProcessContext processContext, final
ConsistencyCheckJobItemProgressContext progressContext) {
- return null;
- }
-
- @Override
- public String getType() {
- return "CONSISTENCY_CHECK";
- }
-}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 10343fb846c..2d096a2d0d6 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -17,8 +17,14 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
/**
* Consistency check job type.
@@ -30,9 +36,37 @@ public final class ConsistencyCheckJobType implements
PipelineJobType {
return "02";
}
+ @SuppressWarnings("unchecked")
@Override
- public PipelineJobOption getOption() {
- return new ConsistencyCheckJobOption();
+ public YamlConsistencyCheckJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
+ return new YamlConsistencyCheckJobConfigurationSwapper();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlConsistencyCheckJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlConsistencyCheckJobItemProgressSwapper();
+ }
+
+ @Override
+ public Class<ConsistencyCheckJob> getJobClass() {
+ return ConsistencyCheckJob.class;
+ }
+
+ @Override
+ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+ return true;
+ }
+
+ @Override
+ public PipelineJobInfo getJobInfo(final String jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig,
+ final
TransmissionProcessContext processContext, final
ConsistencyCheckJobItemProgressContext progressContext) {
+ return null;
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 4c1018a7ff1..e4b7f31ebea 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -35,7 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfi
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
@@ -72,10 +72,10 @@ public final class ConsistencyCheckJobAPI {
private final PipelineJobItemManager<ConsistencyCheckJobItemProgress>
jobItemManager;
- public ConsistencyCheckJobAPI(final ConsistencyCheckJobOption jobOption) {
- progressSwapper = jobOption.getYamlJobItemProgressSwapper();
- jobManager = new PipelineJobManager(jobOption);
- jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+ public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) {
+ progressSwapper = jobType.getYamlJobItemProgressSwapper();
+ jobManager = new PipelineJobManager(jobType);
+ jobConfigManager = new PipelineJobConfigurationManager(jobType);
jobItemManager = new PipelineJobItemManager<>(progressSwapper);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 35accd7c08d..04937944021 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -34,13 +34,12 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -56,11 +55,11 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
- private final ConsistencyCheckJobOption jobAPI = new
ConsistencyCheckJobOption();
+ private final PipelineJobType jobType = new ConsistencyCheckJobType();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobType);
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
@@ -90,7 +89,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
CompletableFuture<?> future =
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
ExecuteEngine.trigger(Collections.singletonList(future), new
CheckExecuteCallback());
@@ -108,12 +107,11 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
protected void runBlocking() {
jobItemManager.persistProgress(jobItemContext);
PipelineJobType jobType =
PipelineJobIdUtils.parseJobType(parentJobId);
- PipelineJobOption jobOption =
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
- PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
+ PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId);
try {
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()),
jobType.getType()));
- PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(
+ PipelineDataConsistencyChecker checker =
jobType.buildDataConsistencyChecker(
parentJobConfig, new
TransmissionProcessContext(parentJobConfig.getJobId(), processConfig),
jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
checker.check(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index c405924b266..320ebb5db58 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -68,9 +68,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class MigrationJob extends AbstractSimplePipelineJob {
- private final MigrationJobOption jobOption = new MigrationJobOption();
-
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
@@ -89,7 +87,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobOption.getType()));
+
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"MIGRATION"));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
deleted file mode 100644
index 31e36e2879b..00000000000
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
-
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Optional;
-
-/**
- * Migration job option.
- */
-@Slf4j
-public final class MigrationJobOption implements PipelineJobOption {
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
- return new YamlMigrationJobConfigurationSwapper();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
- return new YamlTransmissionJobItemProgressSwapper();
- }
-
- @Override
- public Class<MigrationJob> getJobClass() {
- return MigrationJob.class;
- }
-
- @Override
- public Optional<String> getToBeStartDisabledNextJobType() {
- return Optional.of("CONSISTENCY_CHECK");
- }
-
- @Override
- public Optional<String> getToBeStoppedPreviousJobType() {
- return Optional.of("CONSISTENCY_CHECK");
- }
-
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- Collection<String> sourceTables = new LinkedList<>();
- new
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
- .forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
- return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
- }
-
- @Override
- public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
- final
ConsistencyCheckJobItemProgressContext progressContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
jobConfig, processContext, progressContext);
- }
-
- @Override
- public String getType() {
- return "MIGRATION";
- }
-}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index c3a26100194..fc84d389aef 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -17,8 +17,24 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Optional;
/**
* Migration job type.
@@ -30,9 +46,46 @@ public final class MigrationJobType implements
PipelineJobType {
return "01";
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
+ return new YamlMigrationJobConfigurationSwapper();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlTransmissionJobItemProgressSwapper();
+ }
+
+ @Override
+ public Class<MigrationJob> getJobClass() {
+ return MigrationJob.class;
+ }
+
+ @Override
+ public Optional<String> getToBeStartDisabledNextJobType() {
+ return Optional.of("CONSISTENCY_CHECK");
+ }
+
+ @Override
+ public Optional<String> getToBeStoppedPreviousJobType() {
+ return Optional.of("CONSISTENCY_CHECK");
+ }
+
+ @Override
+ public PipelineJobInfo getJobInfo(final String jobId) {
+ PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ Collection<String> sourceTables = new LinkedList<>();
+ new PipelineJobConfigurationManager(new
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+ .forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
+ return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
+ }
+
@Override
- public PipelineJobOption getOption() {
- return new MigrationJobOption();
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
+ final
ConsistencyCheckJobItemProgressContext progressContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
jobConfig, processContext, progressContext);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 0da06c43777..45e5071185b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -41,12 +41,11 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
@@ -100,9 +99,9 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
private final PipelineDataSourcePersistService dataSourcePersistService;
public MigrationJobAPI() {
- PipelineJobOption jobOption = new MigrationJobOption();
- jobManager = new PipelineJobManager(jobOption);
- jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+ PipelineJobType jobType = new MigrationJobType();
+ jobManager = new PipelineJobManager(jobType);
+ jobConfigManager = new PipelineJobConfigurationManager(jobType);
dataSourcePersistService = new PipelineDataSourcePersistService();
}
@@ -319,7 +318,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
getType())).getJobConfiguration(jobId);
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 125c8433042..c4dcd35b7fb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -43,7 +43,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -99,7 +99,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
}
private long getRecordsCount() {
- Map<Integer, TransmissionJobItemProgress> jobProgress = new
TransmissionJobManager(new MigrationJobOption()).getJobProgress(jobConfig);
+ Map<Integer, TransmissionJobItemProgress> jobProgress = new
TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig);
return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 9d64551061c..4afd70afd90 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -28,9 +28,9 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
@@ -54,7 +54,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareT
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -81,9 +81,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class MigrationJobPreparer {
- private final MigrationJobOption jobOption = new MigrationJobOption();
+ private final MigrationJobType jobType = new MigrationJobType();
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
/**
* Do prepare work.
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
index fff58c7361c..cdc5c106c20 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
@@ -37,7 +37,7 @@ public final class AlterTransmissionRuleUpdater implements
RALUpdater<AlterTrans
@Override
public void executeUpdate(final String databaseName, final
AlterTransmissionRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig =
TransmissionProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
- String jobType = TypedSPILoader.getService(PipelineJobType.class,
sqlStatement.getJobTypeName()).getOption().getType();
+ String jobType = TypedSPILoader.getService(PipelineJobType.class,
sqlStatement.getJobTypeName()).getType();
processConfigPersistService.persist(new
PipelineContextKey(InstanceType.PROXY), jobType, processConfig);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index a0835aaf4a8..e3b53e10fcf 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -26,8 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.CreateConsistencyCheckJobParameter;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
@@ -51,11 +51,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
class ConsistencyCheckJobAPITest {
- private final ConsistencyCheckJobOption jobOption = new
ConsistencyCheckJobOption();
+ private final ConsistencyCheckJobType jobType = new
ConsistencyCheckJobType();
- private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(jobOption);
+ private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(jobType);
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new
YamlMigrationJobConfigurationSwapper();
@@ -70,7 +70,7 @@ class ConsistencyCheckJobAPITest {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.start(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 46f0812cdc1..a06584cdf04 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
@@ -37,15 +38,15 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
@@ -96,7 +97,7 @@ import static org.mockito.Mockito.when;
@StaticMockSettings(PipelineDistributedBarrier.class)
class MigrationJobAPITest {
- private static MigrationJobOption jobOption;
+ private static PipelineJobType jobType;
private static MigrationJobAPI jobAPI;
@@ -113,12 +114,12 @@ class MigrationJobAPITest {
@BeforeAll
static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
- jobOption = new MigrationJobOption();
+ jobType = new MigrationJobType();
jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
- jobConfigManager = new PipelineJobConfigurationManager(jobOption);
- jobManager = new PipelineJobManager(jobOption);
- transmissionJobManager = new TransmissionJobManager(jobOption);
- jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ jobConfigManager = new PipelineJobConfigurationManager(jobType);
+ jobManager = new PipelineJobManager(jobType);
+ transmissionJobManager = new TransmissionJobManager(jobType);
+ jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
String jdbcUrl =
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
databaseType = DatabaseTypeFactory.get(jdbcUrl);
Map<String, Object> props = new HashMap<>();
@@ -195,9 +196,9 @@ class MigrationJobAPITest {
initTableData(jobConfig);
jobManager.start(jobConfig);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
- new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobOption.getType()));
+ new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobType.getType()));
TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
- Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobOption.buildDataConsistencyChecker(
+ Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobType.buildDataConsistencyChecker(
jobConfig, processContext, new
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0,
"H2")).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";