This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 92d76f156a6 Merge TransmissionJobOption and PipelineJobOption (#29268)
92d76f156a6 is described below
commit 92d76f156a6b528020e57ca159f6ee392bc9a6bf
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 3 13:07:05 2023 +0800
Merge TransmissionJobOption and PipelineJobOption (#29268)
---
.../core/job/option/PipelineJobOption.java | 22 +++++++++
.../core/job/option/TransmissionJobOption.java | 55 ----------------------
.../core/job/service/PipelineJobManager.java | 8 ++--
.../core/job/service/TransmissionJobManager.java | 4 +-
.../data/pipeline/cdc/CDCJobOption.java | 11 ++++-
.../ConsistencyCheckJobOption.java | 16 +++++++
.../task/ConsistencyCheckTasksRunner.java | 4 +-
.../scenario/migration/MigrationJobOption.java | 11 ++++-
.../scenario/migration/api/MigrationJobAPI.java | 4 +-
9 files changed, 66 insertions(+), 69 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
index 29e4f55a863..6b3d4b71e19 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
@@ -18,8 +18,12 @@
package org.apache.shardingsphere.data.pipeline.core.job.option;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
@@ -94,6 +98,24 @@ public interface PipelineJobOption {
return false;
}
+ /**
+ * Get pipeline job info.
+ *
+ * @param jobId job ID
+ * @return pipeline job info
+ */
+ PipelineJobInfo getJobInfo(String jobId);
+
+ /**
+ * Build pipeline data consistency checker.
+ *
+ * @param jobConfig job configuration
+ * @param processContext process context
+ * @param progressContext consistency check job item progress context
+ * @return all logic tables check result
+ */
+ PipelineDataConsistencyChecker
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig,
TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
+
/**
* Get job type.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
deleted file mode 100644
index 587a2ddb5df..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.option;
-
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-
-/**
- * Transmission job option.
- */
-public interface TransmissionJobOption extends PipelineJobOption {
-
- @SuppressWarnings("unchecked")
- @Override
- default YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
- return new YamlTransmissionJobItemProgressSwapper();
- }
-
- /**
- * Get pipeline job info.
- *
- * @param jobId job ID
- * @return pipeline job info
- */
- PipelineJobInfo getJobInfo(String jobId);
-
- /**
- * Build pipeline data consistency checker.
- *
- * @param jobConfig job configuration
- * @param processContext process context
- * @param progressContext consistency check job item progress context
- * @return all logic tables check result
- */
- PipelineDataConsistencyChecker
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig,
TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 0d7b8c18121..a00564b5bbc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -174,11 +173,12 @@ public final class PipelineJobManager {
* @return jobs info
*/
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey
contextKey) {
- if (jobOption instanceof TransmissionJobOption) {
+ try {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
.filter(each -> !each.getJobName().startsWith("_") &&
jobOption.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
- .map(each -> ((TransmissionJobOption)
jobOption).getJobInfo(each.getJobName())).collect(Collectors.toList());
+ .map(each ->
jobOption.getJobInfo(each.getJobName())).collect(Collectors.toList());
+ } catch (final UnsupportedOperationException ex) {
+ return Collections.emptyList();
}
- return Collections.emptyList();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 1a05e91ba13..842a84f2a6e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import java.util.Collection;
@@ -43,7 +43,7 @@ import java.util.stream.IntStream;
@RequiredArgsConstructor
public final class TransmissionJobManager {
- private final TransmissionJobOption jobOption;
+ private final PipelineJobOption jobOption;
/**
* Get job infos.
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index e37492fd1de..d2e6854e755 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -22,19 +22,20 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
/**
* CDC job option.
*/
@Slf4j
-public final class CDCJobOption implements TransmissionJobOption {
+public final class CDCJobOption implements PipelineJobOption {
@SuppressWarnings("unchecked")
@Override
@@ -42,6 +43,12 @@ public final class CDCJobOption implements
TransmissionJobOption {
return new YamlCDCJobConfigurationSwapper();
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlTransmissionJobItemProgressSwapper();
+ }
+
@Override
public Class<CDCJob> getJobClass() {
return CDCJob.class;
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
index 16f2070a9c1..d5a1caaf312 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
@@ -17,7 +17,12 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
@@ -48,6 +53,17 @@ public final class ConsistencyCheckJobOption implements
PipelineJobOption {
return true;
}
+ @Override
+ public PipelineJobInfo getJobInfo(final String jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig,
+ final
TransmissionProcessContext processContext, final
ConsistencyCheckJobItemProgressContext progressContext) {
+ return null;
+ }
+
@Override
public String getType() {
return "CONSISTENCY_CHECK";
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 63d9849ce92..35accd7c08d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -34,7 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -108,7 +108,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
protected void runBlocking() {
jobItemManager.persistProgress(jobItemContext);
PipelineJobType jobType =
PipelineJobIdUtils.parseJobType(parentJobId);
- TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
+ PipelineJobOption jobOption =
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 378b8df3d5d..31e36e2879b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -21,12 +21,13 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -40,7 +41,7 @@ import java.util.Optional;
* Migration job option.
*/
@Slf4j
-public final class MigrationJobOption implements TransmissionJobOption {
+public final class MigrationJobOption implements PipelineJobOption {
@SuppressWarnings("unchecked")
@Override
@@ -48,6 +49,12 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return new YamlMigrationJobConfigurationSwapper();
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlTransmissionJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlTransmissionJobItemProgressSwapper();
+ }
+
@Override
public Class<MigrationJob> getJobClass() {
return MigrationJob.class;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 84e63ee2345..0da06c43777 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -41,7 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
@@ -100,7 +100,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
private final PipelineDataSourcePersistService dataSourcePersistService;
public MigrationJobAPI() {
- TransmissionJobOption jobOption = new MigrationJobOption();
+ PipelineJobOption jobOption = new MigrationJobOption();
jobManager = new PipelineJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
dataSourcePersistService = new PipelineDataSourcePersistService();