This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf02fcef3e Replace static yaml swapper to object level (#21397)
cbf02fcef3e is described below

commit cbf02fcef3e44529f847a59b4876a30bef57e307
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 8 23:07:44 2022 +0800

    Replace static yaml swapper to object level (#21397)
---
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 10 +++++-----
 .../api/impl/PipelineDataSourcePersistService.java |  6 +++---
 ...PipelineProcessConfigurationPersistService.java |  6 +++---
 ...InventoryIncrementalJobItemProgressSwapper.java | 12 ++++++------
 .../consistencycheck/ConsistencyCheckJob.java      |  2 +-
 .../ConsistencyCheckJobAPIImpl.java                | 10 +++++-----
 .../scenario/migration/MigrationJobAPIImpl.java    | 22 ++++++++++------------
 ...amlConsistencyCheckJobConfigurationSwapper.java | 10 ++--------
 ...lineProcessConfigurationPersistServiceTest.java |  6 ++----
 9 files changed, 37 insertions(+), 47 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 987bf1eaa96..acb875a682a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -55,7 +55,7 @@ import java.util.stream.IntStream;
 @Slf4j
 public abstract class AbstractInventoryIncrementalJobAPIImpl extends 
AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI, 
InventoryIncrementalJobPublicAPI {
     
-    private static final YamlPipelineProcessConfigurationSwapper 
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    private final YamlPipelineProcessConfigurationSwapper swapper = new 
YamlPipelineProcessConfigurationSwapper();
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -77,14 +77,14 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     public void alterProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
         // TODO check rateLimiter type match or not
         YamlPipelineProcessConfiguration targetYamlProcessConfig = 
getTargetYamlProcessConfiguration();
-        
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
-        processConfigPersistService.persist(getJobType(), 
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+        
targetYamlProcessConfig.copyNonNullFields(swapper.swapToYamlConfiguration(processConfig));
+        processConfigPersistService.persist(getJobType(), 
swapper.swapToObject(targetYamlProcessConfig));
     }
     
     private YamlPipelineProcessConfiguration 
getTargetYamlProcessConfiguration() {
         PipelineProcessConfiguration existingProcessConfig = 
processConfigPersistService.load(getJobType());
         ShardingSpherePreconditions.checkNotNull(existingProcessConfig, 
AlterNotExistProcessConfigurationException::new);
-        return 
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+        return swapper.swapToYamlConfiguration(existingProcessConfig);
     }
     
     @Override
@@ -93,7 +93,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
         PipelineProcessConfigurationUtil.verifyConfPath(confPath);
         YamlPipelineProcessConfiguration targetYamlProcessConfig = 
getTargetYamlProcessConfiguration();
         
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
 finalConfPath);
-        processConfigPersistService.persist(getJobType(), 
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+        processConfigPersistService.persist(getJobType(), 
swapper.swapToObject(targetYamlProcessConfig));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
index 4ae5a9a8c44..1696e6a9c6c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
@@ -35,7 +35,7 @@ import java.util.Map.Entry;
  */
 public final class PipelineDataSourcePersistService implements 
PipelineMetaDataPersistService<Map<String, DataSourceProperties>> {
     
-    private static final YamlDataSourceConfigurationSwapper 
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
+    private final YamlDataSourceConfigurationSwapper swapper = new 
YamlDataSourceConfigurationSwapper();
     
     @Override
     @SuppressWarnings("unchecked")
@@ -46,7 +46,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
         }
         Map<String, Map<String, Object>> yamlDataSources = 
YamlEngine.unmarshal(dataSourcesProperties, Map.class);
         Map<String, DataSourceProperties> result = new 
LinkedHashMap<>(yamlDataSources.size());
-        yamlDataSources.forEach((key, value) -> result.put(key, 
DATA_SOURCE_CONFIG_SWAPPER.swapToDataSourceProperties(value)));
+        yamlDataSources.forEach((key, value) -> result.put(key, 
swapper.swapToDataSourceProperties(value)));
         return result;
     }
     
@@ -54,7 +54,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
     public void persist(final JobType jobType, final Map<String, 
DataSourceProperties> dataSourcePropsMap) {
         Map<String, Map<String, Object>> dataSourceMap = new 
LinkedHashMap<>(dataSourcePropsMap.size());
         for (Entry<String, DataSourceProperties> entry : 
dataSourcePropsMap.entrySet()) {
-            dataSourceMap.put(entry.getKey(), 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(entry.getValue()));
+            dataSourceMap.put(entry.getKey(), 
swapper.swapToMap(entry.getValue()));
         }
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSources(jobType,
 YamlEngine.marshal(dataSourceMap));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
index 947bbccfdfd..e51ec1fe6cc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -31,7 +31,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
  */
 public final class PipelineProcessConfigurationPersistService implements 
PipelineMetaDataPersistService<PipelineProcessConfiguration> {
     
-    private static final YamlPipelineProcessConfigurationSwapper 
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    private final YamlPipelineProcessConfigurationSwapper swapper = new 
YamlPipelineProcessConfigurationSwapper();
     
     @Override
     public PipelineProcessConfiguration load(final JobType jobType) {
@@ -43,12 +43,12 @@ public final class 
PipelineProcessConfigurationPersistService implements Pipelin
         if (null == yamlConfig || yamlConfig.isAllFieldsNull()) {
             return null;
         }
-        return PROCESS_CONFIG_SWAPPER.swapToObject(yamlConfig);
+        return swapper.swapToObject(yamlConfig);
     }
     
     @Override
     public void persist(final JobType jobType, final 
PipelineProcessConfiguration processConfig) {
-        String yamlText = 
YamlEngine.marshal(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+        String yamlText = 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataProcessConfiguration(jobType,
 yamlText);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index a7e89170ef3..1e31d99d7c7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -26,9 +26,9 @@ import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwappe
  */
 public final class YamlInventoryIncrementalJobItemProgressSwapper implements 
YamlConfigurationSwapper<YamlInventoryIncrementalJobItemProgress, 
InventoryIncrementalJobItemProgress> {
     
-    private static final YamlJobItemInventoryTasksProgressSwapper 
INVENTORY_PROGRESS_SWAPPER = new YamlJobItemInventoryTasksProgressSwapper();
+    private final YamlJobItemInventoryTasksProgressSwapper 
inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper();
     
-    private static final YamlJobItemIncrementalTasksProgressSwapper 
INCREMENTAL_PROGRESS_SWAPPER = new YamlJobItemIncrementalTasksProgressSwapper();
+    private final YamlJobItemIncrementalTasksProgressSwapper 
incrementalTasksProgressSwapper = new 
YamlJobItemIncrementalTasksProgressSwapper();
     
     @Override
     public YamlInventoryIncrementalJobItemProgress 
swapToYamlConfiguration(final InventoryIncrementalJobItemProgress progress) {
@@ -36,8 +36,8 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         result.setStatus(progress.getStatus().name());
         result.setSourceDatabaseType(progress.getSourceDatabaseType());
         result.setDataSourceName(progress.getDataSourceName());
-        
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToYaml(progress.getInventory()));
-        
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToYaml(progress.getIncremental()));
+        
result.setInventory(inventoryTasksProgressSwapper.swapToYaml(progress.getInventory()));
+        
result.setIncremental(incrementalTasksProgressSwapper.swapToYaml(progress.getIncremental()));
         result.setProcessedRecordsCount(progress.getProcessedRecordsCount());
         return result;
     }
@@ -48,8 +48,8 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
         result.setSourceDatabaseType(yamlProgress.getSourceDatabaseType());
         result.setDataSourceName(yamlProgress.getDataSourceName());
-        
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToObject(yamlProgress.getInventory()));
-        
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()));
+        
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
+        
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()));
         
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 8cd64fc1cf5..77355f5281d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -53,7 +53,7 @@ public final class ConsistencyCheckJob extends 
AbstractPipelineJob implements Si
     public void execute(final ShardingContext shardingContext) {
         String checkJobId = shardingContext.getJobName();
         setJobId(checkJobId);
-        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = 
YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
+        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         JobStatus status = JobStatus.RUNNING;
         ConsistencyCheckJobItemContext jobItemContext = new 
ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
         jobAPI.persistJobItemProgress(jobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 5f1e3fb9c86..b91daee9f9e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -58,7 +58,7 @@ import java.util.Optional;
 @Slf4j
 public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl implements ConsistencyCheckJobAPI {
     
-    private static final YamlConsistencyCheckJobProgressSwapper 
PROGRESS_SWAPPER = new YamlConsistencyCheckJobProgressSwapper();
+    private final YamlConsistencyCheckJobProgressSwapper swapper = new 
YamlConsistencyCheckJobProgressSwapper();
     
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -105,7 +105,7 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
     public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         ConsistencyCheckJobProgress jobProgress = new 
ConsistencyCheckJobProgress();
         jobProgress.setStatus(jobItemContext.getStatus());
-        YamlConsistencyCheckJobProgress yamlJobProgress = 
PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress);
+        YamlConsistencyCheckJobProgress yamlJobProgress = 
swapper.swapToYamlConfiguration(jobProgress);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
     }
     
@@ -115,7 +115,7 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
         if (StringUtils.isBlank(progress)) {
             return null;
         }
-        ConsistencyCheckJobProgress jobProgress = 
PROGRESS_SWAPPER.swapToObject(YamlEngine.unmarshal(progress, 
YamlConsistencyCheckJobProgress.class, true));
+        ConsistencyCheckJobProgress jobProgress = 
swapper.swapToObject(YamlEngine.unmarshal(progress, 
YamlConsistencyCheckJobProgress.class, true));
         ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
         result.setStatus(jobProgress.getStatus());
         return result;
@@ -129,7 +129,7 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
             return;
         }
         jobProgress.setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
shardingItem, 
YamlEngine.marshal(PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress)));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobProgress)));
     }
     
     @Override
@@ -163,7 +163,7 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
     
     @Override
     protected ConsistencyCheckJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
-        return 
YamlConsistencyCheckJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+        return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index e82dbebbaae..89b312097d7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -30,8 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
@@ -49,8 +47,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
-import 
org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaDataSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -67,6 +63,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipe
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaDataSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
@@ -99,7 +99,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -109,9 +108,9 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAPIImpl implements MigrationJobAPI {
     
-    private static final YamlRuleConfigurationSwapperEngine 
RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
+    private final YamlRuleConfigurationSwapperEngine swapperEngine = new 
YamlRuleConfigurationSwapperEngine();
     
-    private static final YamlDataSourceConfigurationSwapper 
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
+    private final YamlDataSourceConfigurationSwapper swapper = new 
YamlDataSourceConfigurationSwapper();
     
     private final PipelineDataSourcePersistService dataSourcePersistService = 
new PipelineDataSourcePersistService();
     
@@ -372,7 +371,7 @@ public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAP
     public String createJobAndStart(final CreateMigrationJobParameter 
parameter) {
         YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
         Map<String, DataSourceProperties> metaDataDataSource = 
dataSourcePersistService.load(JobType.MIGRATION);
-        Map<String, Object> sourceDataSourceProps = 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(metaDataDataSource.get(parameter.getSourceResourceName()));
+        Map<String, Object> sourceDataSourceProps = 
swapper.swapToMap(metaDataDataSource.get(parameter.getSourceResourceName()));
         YamlPipelineDataSourceConfiguration 
sourcePipelineDataSourceConfiguration = 
createYamlPipelineDataSourceConfiguration(StandardPipelineDataSourceConfiguration.TYPE,
                 YamlEngine.marshal(sourceDataSourceProps));
         result.setSource(sourcePipelineDataSourceConfiguration);
@@ -388,7 +387,7 @@ public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAP
         Map<String, Map<String, Object>> targetDataSourceProperties = new 
HashMap<>();
         ShardingSphereDatabase targetDatabase = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(parameter.getTargetDatabaseName());
         for (Entry<String, DataSource> entry : 
targetDatabase.getResource().getDataSources().entrySet()) {
-            Map<String, Object> dataSourceProps = 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
+            Map<String, Object> dataSourceProps = 
swapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
             targetDataSourceProperties.put(entry.getKey(), dataSourceProps);
         }
         String targetDatabaseName = parameter.getTargetDatabaseName();
@@ -416,7 +415,7 @@ public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAP
         YamlRootConfiguration result = new YamlRootConfiguration();
         result.setDatabaseName(databaseName);
         result.setDataSources(yamlDataSources);
-        Collection<YamlRuleConfiguration> yamlRuleConfigurations = 
RULE_CONFIG_SWAPPER_ENGINE.swapToYamlRuleConfigurations(rules);
+        Collection<YamlRuleConfiguration> yamlRuleConfigurations = 
swapperEngine.swapToYamlRuleConfigurations(rules);
         result.setRules(yamlRuleConfigurations);
         return result;
     }
@@ -431,8 +430,7 @@ public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAP
     @Override
     public void startDisabledJob(final String jobId) {
         super.startDisabledJob(jobId);
-        Optional<String> optional = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
-        optional.ifPresent(s -> 
ConsistencyCheckJobAPIFactory.getInstance().startDisabledJob(s));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId).ifPresent(optional
 -> ConsistencyCheckJobAPIFactory.getInstance().startDisabledJob(optional));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlConsistencyCheckJobConfigurationSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlConsistencyCheckJobConfigurationSwapper.java
index f75ca396401..30f6ae20ed2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlConsistencyCheckJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -26,8 +26,6 @@ import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwappe
  */
 public final class YamlConsistencyCheckJobConfigurationSwapper implements 
YamlConfigurationSwapper<YamlConsistencyCheckJobConfiguration, 
ConsistencyCheckJobConfiguration> {
     
-    private static final YamlConsistencyCheckJobConfigurationSwapper 
JOB_CONFIG_SWAPPER = new YamlConsistencyCheckJobConfigurationSwapper();
-    
     @Override
     public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final 
ConsistencyCheckJobConfiguration data) {
         YamlConsistencyCheckJobConfiguration result = new 
YamlConsistencyCheckJobConfiguration();
@@ -49,11 +47,7 @@ public final class 
YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
      * @param jobParameter job parameter
      * @return job configuration
      */
-    public static ConsistencyCheckJobConfiguration swapToObject(final String 
jobParameter) {
-        if (null == jobParameter) {
-            return null;
-        }
-        YamlConsistencyCheckJobConfiguration yamlJobConfig = 
YamlEngine.unmarshal(jobParameter, YamlConsistencyCheckJobConfiguration.class, 
true);
-        return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+    public ConsistencyCheckJobConfiguration swapToObject(final String 
jobParameter) {
+        return null == jobParameter ? null : 
swapToObject(YamlEngine.unmarshal(jobParameter, 
YamlConsistencyCheckJobConfiguration.class, true));
     }
 }
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
index d880f1deb34..dddc2c8ce9f 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
@@ -37,8 +37,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public final class PipelineProcessConfigurationPersistServiceTest {
     
-    private static final YamlPipelineProcessConfigurationSwapper 
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-    
     @BeforeClass
     public static void beforeClass() {
         PipelineContextUtil.mockModeConfigAndContextManager();
@@ -56,11 +54,11 @@ public final class 
PipelineProcessConfigurationPersistServiceTest {
         YamlAlgorithmConfiguration yamlStreamChannel = new 
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new Properties());
         yamlProcessConfig.setStreamChannel(yamlStreamChannel);
         String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
-        PipelineProcessConfiguration processConfig = 
PROCESS_CONFIG_SWAPPER.swapToObject(yamlProcessConfig);
+        PipelineProcessConfiguration processConfig = new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
         PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
         JobType jobType = JobType.MIGRATION;
         persistService.persist(jobType, processConfig);
-        String actualYamlText = 
YamlEngine.marshal(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(persistService.load(jobType)));
+        String actualYamlText = YamlEngine.marshal(new 
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(jobType)));
         assertThat(actualYamlText, is(expectedYamlText));
     }
 }

Reply via email to