This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 92d76f156a6 Merge TransmissionJobOption and PipelineJobOption (#29268)
92d76f156a6 is described below

commit 92d76f156a6b528020e57ca159f6ee392bc9a6bf
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 3 13:07:05 2023 +0800

    Merge TransmissionJobOption and PipelineJobOption (#29268)
---
 .../core/job/option/PipelineJobOption.java         | 22 +++++++++
 .../core/job/option/TransmissionJobOption.java     | 55 ----------------------
 .../core/job/service/PipelineJobManager.java       |  8 ++--
 .../core/job/service/TransmissionJobManager.java   |  4 +-
 .../data/pipeline/cdc/CDCJobOption.java            | 11 ++++-
 .../ConsistencyCheckJobOption.java                 | 16 +++++++
 .../task/ConsistencyCheckTasksRunner.java          |  4 +-
 .../scenario/migration/MigrationJobOption.java     | 11 ++++-
 .../scenario/migration/api/MigrationJobAPI.java    |  4 +-
 9 files changed, 66 insertions(+), 69 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
index 29e4f55a863..6b3d4b71e19 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
@@ -18,8 +18,12 @@
 package org.apache.shardingsphere.data.pipeline.core.job.option;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
@@ -94,6 +98,24 @@ public interface PipelineJobOption {
         return false;
     }
     
+    /**
+     * Get pipeline job info.
+     *
+     * @param jobId job ID
+     * @return pipeline job info
+     */
+    PipelineJobInfo getJobInfo(String jobId);
+    
+    /**
+     * Build pipeline data consistency checker.
+     *
+     * @param jobConfig job configuration
+     * @param processContext process context
+     * @param progressContext consistency check job item progress context
+     * @return all logic tables check result
+     */
+    PipelineDataConsistencyChecker 
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, 
TransmissionProcessContext processContext, 
ConsistencyCheckJobItemProgressContext progressContext);
+    
     /**
      * Get job type.
      * 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
deleted file mode 100644
index 587a2ddb5df..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.option;
-
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-
-/**
- * Transmission job option.
- */
-public interface TransmissionJobOption extends PipelineJobOption {
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    default YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
-        return new YamlTransmissionJobItemProgressSwapper();
-    }
-    
-    /**
-     * Get pipeline job info.
-     *
-     * @param jobId job ID
-     * @return pipeline job info
-     */
-    PipelineJobInfo getJobInfo(String jobId);
-    
-    /**
-     * Build pipeline data consistency checker.
-     *
-     * @param jobConfig job configuration
-     * @param processContext process context
-     * @param progressContext consistency check job item progress context
-     * @return all logic tables check result
-     */
-    PipelineDataConsistencyChecker 
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, 
TransmissionProcessContext processContext, 
ConsistencyCheckJobItemProgressContext progressContext);
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 0d7b8c18121..a00564b5bbc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -174,11 +173,12 @@ public final class PipelineJobManager {
      * @return jobs info
      */
     public List<PipelineJobInfo> getJobInfos(final PipelineContextKey 
contextKey) {
-        if (jobOption instanceof TransmissionJobOption) {
+        try {
             return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
                     .filter(each -> !each.getJobName().startsWith("_") && 
jobOption.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
-                    .map(each -> ((TransmissionJobOption) 
jobOption).getJobInfo(each.getJobName())).collect(Collectors.toList());
+                    .map(each -> 
jobOption.getJobInfo(each.getJobName())).collect(Collectors.toList());
+        } catch (final UnsupportedOperationException ex) {
+            return Collections.emptyList();
         }
-        return Collections.emptyList();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 1a05e91ba13..842a84f2a6e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 
 import java.util.Collection;
@@ -43,7 +43,7 @@ import java.util.stream.IntStream;
 @RequiredArgsConstructor
 public final class TransmissionJobManager {
     
-    private final TransmissionJobOption jobOption;
+    private final PipelineJobOption jobOption;
     
     /**
      * Get job infos.
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index e37492fd1de..d2e6854e755 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -22,19 +22,20 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 
 /**
  * CDC job option.
  */
 @Slf4j
-public final class CDCJobOption implements TransmissionJobOption {
+public final class CDCJobOption implements PipelineJobOption {
     
     @SuppressWarnings("unchecked")
     @Override
@@ -42,6 +43,12 @@ public final class CDCJobOption implements 
TransmissionJobOption {
         return new YamlCDCJobConfigurationSwapper();
     }
     
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlTransmissionJobItemProgressSwapper();
+    }
+    
     @Override
     public Class<CDCJob> getJobClass() {
         return CDCJob.class;
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
index 16f2070a9c1..d5a1caaf312 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
@@ -17,7 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
 
@@ -48,6 +53,17 @@ public final class ConsistencyCheckJobOption implements 
PipelineJobOption {
         return true;
     }
     
+    @Override
+    public PipelineJobInfo getJobInfo(final String jobId) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig,
+                                                                      final 
TransmissionProcessContext processContext, final 
ConsistencyCheckJobItemProgressContext progressContext) {
+        return null;
+    }
+    
     @Override
     public String getType() {
         return "CONSISTENCY_CHECK";
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 63d9849ce92..35accd7c08d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -34,7 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -108,7 +108,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         protected void runBlocking() {
             jobItemManager.persistProgress(jobItemContext);
             PipelineJobType jobType = 
PipelineJobIdUtils.parseJobType(parentJobId);
-            TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
+            PipelineJobOption jobOption = 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
             try {
                 PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 378b8df3d5d..31e36e2879b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -21,12 +21,13 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -40,7 +41,7 @@ import java.util.Optional;
  * Migration job option.
  */
 @Slf4j
-public final class MigrationJobOption implements TransmissionJobOption {
+public final class MigrationJobOption implements PipelineJobOption {
     
     @SuppressWarnings("unchecked")
     @Override
@@ -48,6 +49,12 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return new YamlMigrationJobConfigurationSwapper();
     }
     
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlTransmissionJobItemProgressSwapper();
+    }
+    
     @Override
     public Class<MigrationJob> getJobClass() {
         return MigrationJob.class;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 84e63ee2345..0da06c43777 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -41,7 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
@@ -100,7 +100,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     private final PipelineDataSourcePersistService dataSourcePersistService;
     
     public MigrationJobAPI() {
-        TransmissionJobOption jobOption = new MigrationJobOption();
+        PipelineJobOption jobOption = new MigrationJobOption();
         jobManager = new PipelineJobManager(jobOption);
         jobConfigManager = new PipelineJobConfigurationManager(jobOption);
         dataSourcePersistService = new PipelineDataSourcePersistService();

Reply via email to