This is an automated email from the ASF dual-hosted git repository.
linghengqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e70c639de72 Use PipelineJobAPI.getPipelineJobClass() to instead of
AbstractPipelineJobAPIImpl.getJobClassName() (#29018)
e70c639de72 is described below
commit e70c639de72acde19fe175e8fff3798ebd10f8e7
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 13 02:02:43 2023 +0800
Use PipelineJobAPI.getPipelineJobClass() to instead of
AbstractPipelineJobAPIImpl.getJobClassName() (#29018)
---
.../data/pipeline/core/job/service/PipelineJobAPI.java | 8 ++++++++
.../core/job/service/impl/AbstractPipelineJobAPIImpl.java | 4 +---
.../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 +++---
.../consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 4 ++--
.../pipeline/scenario/migration/api/impl/MigrationJobAPI.java | 8 ++++----
5 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 59c4a42fb7a..9f7fd928521 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
@@ -160,6 +161,13 @@ public interface PipelineJobAPI extends TypedSPI {
*/
void cleanJobItemErrorMessage(String jobId, int shardingItem);
+ /**
+ * Get pipeline job class.
+ *
+ * @return pipeline job class
+ */
+ Class<? extends PipelineJob> getPipelineJobClass();
+
@Override
String getType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 5dbd46b1cb9..fa8a1f78e25 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -60,13 +60,11 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
getJobClassName());
+ repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
getPipelineJobClass().getName());
repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(convertJobConfiguration(jobConfig)));
return Optional.of(jobId);
}
- protected abstract String getJobClassName();
-
protected JobConfigurationPOJO convertJobConfiguration(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 482b05d21d5..9c99ae5f096 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -122,7 +122,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
} else {
-
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getJobClassName());
+
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getPipelineJobClass().getName());
JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfigPOJO));
@@ -345,8 +345,8 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- protected String getJobClassName() {
- return CDCJob.class.getName();
+ public Class<CDCJob> getPipelineJobClass() {
+ return CDCJob.class;
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 7a894858c80..58d4051ea08 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -390,8 +390,8 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
}
@Override
- protected String getJobClassName() {
- return ConsistencyCheckJob.class.getName();
+ public Class<ConsistencyCheckJob> getPipelineJobClass() {
+ return ConsistencyCheckJob.class;
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 7c3cbcd2141..4a6a2b2c37c 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -473,12 +473,12 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
@Override
- public String getType() {
- return "MIGRATION";
+ public Class<MigrationJob> getPipelineJobClass() {
+ return MigrationJob.class;
}
@Override
- protected String getJobClassName() {
- return MigrationJob.class.getName();
+ public String getType() {
+ return "MIGRATION";
}
}