This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 21da3a9f67d Refactor CDCJobAPI (#29239)
21da3a9f67d is described below
commit 21da3a9f67d1dfb9ae400ae18f8cc4b44ac368af
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 30 16:08:20 2023 +0800
Refactor CDCJobAPI (#29239)
---
.../data/pipeline/cdc/api/CDCJobAPI.java | 27 ++++++++++++++++------
1 file changed, 20 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 212a8cc8276..80b8c0b9f97 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -81,13 +81,26 @@ import java.util.stream.Collectors;
@Slf4j
public final class CDCJobAPI implements TransmissionJobAPI {
- private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper =
new YamlDataSourceConfigurationSwapper();
+ private final CDCJobOption jobOption;
- private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine =
new YamlRuleConfigurationSwapperEngine();
+ private final PipelineJobManager jobManager;
- private final YamlPipelineDataSourceConfigurationSwapper
pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
+ private final PipelineJobConfigurationManager jobConfigManager;
- private final CDCJobOption jobOption = new CDCJobOption();
+ private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;
+
+ private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;
+
+ private final YamlPipelineDataSourceConfigurationSwapper
pipelineDataSourceConfigSwapper;
+
+ public CDCJobAPI() {
+ jobOption = new CDCJobOption();
+ jobManager = new PipelineJobManager(jobOption);
+ jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+ dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
+ ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
+ pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
+ }
/**
* Create CDC job.
@@ -108,7 +121,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
jobOption.getJobClass());
- JobConfigurationPOJO jobConfigPOJO = new
PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig);
+ JobConfigurationPOJO jobConfigPOJO =
jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
@@ -231,9 +244,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
* @param jobId job id
*/
public void drop(final String jobId) {
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
+ CDCJobConfiguration jobConfig =
jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
() -> new PipelineInternalException("Can't drop streaming job which is
active"));
- new PipelineJobManager(jobOption).drop(jobId);
+ jobManager.drop(jobId);
cleanup(jobConfig);
}