This is an automated email from the ASF dual-hosted git repository.

linghengqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e70c639de72 Use PipelineJobAPI.getPipelineJobClass() to instead of 
AbstractPipelineJobAPIImpl.getJobClassName() (#29018)
e70c639de72 is described below

commit e70c639de72acde19fe175e8fff3798ebd10f8e7
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 13 02:02:43 2023 +0800

    Use PipelineJobAPI.getPipelineJobClass() to instead of 
AbstractPipelineJobAPIImpl.getJobClassName() (#29018)
---
 .../data/pipeline/core/job/service/PipelineJobAPI.java            | 8 ++++++++
 .../core/job/service/impl/AbstractPipelineJobAPIImpl.java         | 4 +---
 .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java      | 6 +++---
 .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java         | 4 ++--
 .../pipeline/scenario/migration/api/impl/MigrationJobAPI.java     | 8 ++++----
 5 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 59c4a42fb7a..9f7fd928521 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
@@ -160,6 +161,13 @@ public interface PipelineJobAPI extends TypedSPI {
      */
     void cleanJobItemErrorMessage(String jobId, int shardingItem);
     
+    /**
+     * Get pipeline job class.
+     * 
+     * @return pipeline job class
+     */
+    Class<? extends PipelineJob> getPipelineJobClass();
+    
     @Override
     String getType();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 5dbd46b1cb9..fa8a1f78e25 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -60,13 +60,11 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
             log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
             return Optional.of(jobId);
         }
-        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
getJobClassName());
+        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
getPipelineJobClass().getName());
         repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(convertJobConfiguration(jobConfig)));
         return Optional.of(jobId);
     }
     
-    protected abstract String getJobClassName();
-    
     protected JobConfigurationPOJO convertJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO result = new JobConfigurationPOJO();
         result.setJobName(jobConfig.getJobId());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 482b05d21d5..9c99ae5f096 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -122,7 +122,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         if (repositoryAPI.isExisted(jobConfigKey)) {
             log.warn("CDC job already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
         } else {
-            
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getJobClassName());
+            
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getPipelineJobClass().getName());
             JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
             jobConfigPOJO.setDisabled(true);
             repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfigPOJO));
@@ -345,8 +345,8 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    protected String getJobClassName() {
-        return CDCJob.class.getName();
+    public Class<CDCJob> getPipelineJobClass() {
+        return CDCJob.class;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 7a894858c80..58d4051ea08 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -390,8 +390,8 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
     }
     
     @Override
-    protected String getJobClassName() {
-        return ConsistencyCheckJob.class.getName();
+    public Class<ConsistencyCheckJob> getPipelineJobClass() {
+        return ConsistencyCheckJob.class;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 7c3cbcd2141..4a6a2b2c37c 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -473,12 +473,12 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     @Override
-    public String getType() {
-        return "MIGRATION";
+    public Class<MigrationJob> getPipelineJobClass() {
+        return MigrationJob.class;
     }
     
     @Override
-    protected String getJobClassName() {
-        return MigrationJob.class.getName();
+    public String getType() {
+        return "MIGRATION";
     }
 }

Reply via email to