This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 92dbc4bcce2 Move AbstractPipelineJobAPIImpl.convertJobConfiguration 
and swapToYamlJobConfiguration to PipelineJobConfiguration (#29030)
92dbc4bcce2 is described below

commit 92dbc4bcce27671c4dd201b4de678a6c7956e442
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 13 23:34:36 2023 +0800

    Move AbstractPipelineJobAPIImpl.convertJobConfiguration and 
swapToYamlJobConfiguration to PipelineJobConfiguration (#29030)
    
    * Use PipelineJobAPI.getPipelineJobClass() to instead of 
AbstractPipelineJobAPIImpl.getJobClassName()
    
    * Move AbstractPipelineJobAPIImpl.convertJobConfiguration and 
swapToYamlJobConfiguration to PipelineJobConfiguration
---
 .../config/job/PipelineJobConfiguration.java       | 35 ++++++++++++++++++++++
 .../service/impl/AbstractPipelineJobAPIImpl.java   | 20 +------------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 14 +--------
 .../cdc/config/job/CDCJobConfiguration.java        | 15 ++++++++++
 .../api/impl/ConsistencyCheckJobAPI.java           |  5 ----
 .../config/ConsistencyCheckJobConfiguration.java   | 12 ++++----
 .../migration/api/impl/MigrationJobAPI.java        |  5 ----
 .../config/MigrationJobConfiguration.java          |  7 +++++
 8 files changed, 66 insertions(+), 47 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
index e6f6516b7f4..511c10d2fb9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
@@ -17,13 +17,23 @@
 
 package org.apache.shardingsphere.data.pipeline.common.config.job;
 
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
 
 /**
  * Pipeline job configuration.
  */
 public interface PipelineJobConfiguration {
     
+    DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    
     /**
      * Get job id.
      *
@@ -44,4 +54,29 @@ public interface PipelineJobConfiguration {
      * @return source database type
      */
     DatabaseType getSourceDatabaseType();
+    
+    /**
+     * Convert to job configuration POJO.
+     * 
+     * @return converted job configuration POJO
+     */
+    default JobConfigurationPOJO convertToJobConfigurationPOJO() {
+        JobConfigurationPOJO result = new JobConfigurationPOJO();
+        result.setJobName(getJobId());
+        result.setShardingTotalCount(getJobShardingCount());
+        
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
+        String createTimeFormat = 
LocalDateTime.now().format(DATE_TIME_FORMATTER);
+        result.getProps().setProperty("create_time", createTimeFormat);
+        result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
+        result.getProps().setProperty("run_count", "1");
+        
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
+        return result;
+    }
+    
+    /**
+     * Swap to YAML pipeline job configuration.
+     * 
+     * @return swapped YAML pipeline job configuration
+     */
+    YamlPipelineJobConfiguration swapToYamlJobConfiguration();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index fa8a1f78e25..f2ef4ea1754 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -20,9 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.service.impl;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -38,7 +36,6 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
@@ -61,25 +58,10 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
             return Optional.of(jobId);
         }
         repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
getPipelineJobClass().getName());
-        repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(convertJobConfiguration(jobConfig)));
+        repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
         return Optional.of(jobId);
     }
     
-    protected JobConfigurationPOJO convertJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
-        JobConfigurationPOJO result = new JobConfigurationPOJO();
-        result.setJobName(jobConfig.getJobId());
-        result.setShardingTotalCount(jobConfig.getJobShardingCount());
-        
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
-        String createTimeFormat = 
LocalDateTime.now().format(DATE_TIME_FORMATTER);
-        result.getProps().setProperty("create_time", createTimeFormat);
-        result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        result.getProps().setProperty("run_count", "1");
-        
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
-        return result;
-    }
-    
-    protected abstract YamlPipelineJobConfiguration 
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
-    
     protected abstract PipelineJobConfiguration 
getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 9c99ae5f096..847987eb0b1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -123,7 +123,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
             log.warn("CDC job already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
         } else {
             
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getPipelineJobClass().getName());
-            JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
+            JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
             jobConfigPOJO.setDisabled(true);
             repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfigPOJO));
             if (!param.isFull()) {
@@ -193,13 +193,6 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return result;
     }
     
-    @Override
-    protected JobConfigurationPOJO convertJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
-        JobConfigurationPOJO result = super.convertJobConfiguration(jobConfig);
-        result.setShardingTotalCount(1);
-        return result;
-    }
-    
     /**
      * Start job.
      *
@@ -294,11 +287,6 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return new 
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
-    @Override
-    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
-        return new 
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration((CDCJobConfiguration) 
jobConfig);
-    }
-    
     @Override
     public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 8e0aa41737f..c15c38e6243 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -21,8 +21,11 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.List;
@@ -64,6 +67,18 @@ public final class CDCJobConfiguration implements 
PipelineJobConfiguration {
         return jobShardingDataNodes.size();
     }
     
+    @Override
+    public JobConfigurationPOJO convertToJobConfigurationPOJO() {
+        JobConfigurationPOJO result = 
PipelineJobConfiguration.super.convertToJobConfigurationPOJO();
+        result.setShardingTotalCount(1);
+        return result;
+    }
+    
+    @Override
+    public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
+        return new 
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this);
+    }
+    
     @RequiredArgsConstructor
     @Getter
     public static class SinkConfiguration {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 58d4051ea08..f267b56aced 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -365,11 +365,6 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
-    @Override
-    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
-        return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration((ConsistencyCheckJobConfiguration)
 jobConfig);
-    }
-    
     @Override
     public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
     }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index a969417d96f..578eeeed0e0 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -21,6 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.Properties;
@@ -43,13 +45,13 @@ public final class ConsistencyCheckJobConfiguration 
implements PipelineJobConfig
     
     private final DatabaseType sourceDatabaseType;
     
-    /**
-     * Get job sharding count.
-     *
-     * @return job sharding count
-     */
     @Override
     public int getJobShardingCount() {
         return 1;
     }
+    
+    @Override
+    public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
+        return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this);
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 4a6a2b2c37c..ce2e5116377 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -226,11 +226,6 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         }
     }
     
-    @Override
-    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
-        return new 
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration)
 jobConfig);
-    }
-    
     @Override
     public MigrationJobConfiguration getJobConfiguration(final String jobId) {
         return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index 9ef3c648b07..f45a30d512c 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -22,7 +22,9 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.List;
@@ -67,4 +69,9 @@ public final class MigrationJobConfiguration implements 
PipelineJobConfiguration
     public int getJobShardingCount() {
         return jobShardingDataNodes.size();
     }
+    
+    @Override
+    public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
+        return new 
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this);
+    }
 }

Reply via email to