This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 21da3a9f67d Refactor CDCJobAPI (#29239)
21da3a9f67d is described below

commit 21da3a9f67d1dfb9ae400ae18f8cc4b44ac368af
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 30 16:08:20 2023 +0800

    Refactor CDCJobAPI (#29239)
---
 .../data/pipeline/cdc/api/CDCJobAPI.java           | 27 ++++++++++++++++------
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 212a8cc8276..80b8c0b9f97 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -81,13 +81,26 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class CDCJobAPI implements TransmissionJobAPI {
     
-    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
+    private final CDCJobOption jobOption;
     
-    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = 
new YamlRuleConfigurationSwapperEngine();
+    private final PipelineJobManager jobManager;
     
-    private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
+    private final PipelineJobConfigurationManager jobConfigManager;
     
-    private final CDCJobOption jobOption = new CDCJobOption();
+    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;
+    
+    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;
+    
+    private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper;
+    
+    public CDCJobAPI() {
+        jobOption = new CDCJobOption();
+        jobManager = new PipelineJobManager(jobOption);
+        jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+        dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
+        ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
+        pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
+    }
     
     /**
      * Create CDC job.
@@ -108,7 +121,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
             
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
jobOption.getJobClass());
-            JobConfigurationPOJO jobConfigPOJO = new 
PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig);
+            JobConfigurationPOJO jobConfigPOJO = 
jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
             jobConfigPOJO.setDisabled(true);
             
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
             if (!param.isFull()) {
@@ -231,9 +244,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      * @param jobId job id
      */
     public void drop(final String jobId) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
+        CDCJobConfiguration jobConfig = 
jobConfigManager.getJobConfiguration(jobId);
         
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
 () -> new PipelineInternalException("Can't drop streaming job which is 
active"));
-        new PipelineJobManager(jobOption).drop(jobId);
+        jobManager.drop(jobId);
         cleanup(jobConfig);
     }
     

Reply via email to