This is an automated email from the ASF dual-hosted git repository.

machen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d1fd2aa78 Impl create/alter/show ProcessConfiguration in pipeline 
job API (#20317)
c4d1fd2aa78 is described below

commit c4d1fd2aa78e6654653c32aaded1d8a9752d87a4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 20 16:44:44 2022 +0800

    Impl create/alter/show ProcessConfiguration in pipeline job API (#20317)
    
    * Extract PipelineMetaDataPersistService
    
    * Unify addMigrationSourceResources parameter name
    
    * Unify meta data dataSources naming
    
    * Add PipelineProcessConfigurationPersistService
    
    * Impl create/alter/show ProcessConfiguration in pipeline job API
---
 .../pipeline/YamlPipelineProcessConfiguration.java | 22 ++++++++
 .../pipeline/YamlPipelineReadConfiguration.java    | 23 ++++++++
 .../pipeline/YamlPipelineWriteConfiguration.java   | 20 +++++++
 .../data/pipeline/api/MigrationJobPublicAPI.java   |  8 +--
 .../data/pipeline/api/PipelineJobPublicAPI.java    | 22 ++++++++
 .../pipeline/core/api/GovernanceRepositoryAPI.java | 28 +++++++--
 ...PI.java => PipelineMetaDataPersistService.java} | 25 ++++----
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 45 ++++++++++++++-
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 16 +++++-
 ....java => PipelineDataSourcePersistService.java} | 18 +++---
 ...PipelineProcessConfigurationPersistService.java | 50 ++++++++++++++++
 .../context/AbstractPipelineProcessContext.java    | 33 +----------
 .../core/exception/PipelineMetaDataException.java  | 23 +++-----
 .../core/metadata/node/PipelineMetaDataNode.java   |  6 +-
 .../util/PipelineProcessConfigurationUtils.java    | 63 +++++++++++++++++++++
 .../scenario/migration/MigrationJobAPIImpl.java    | 32 ++++++-----
 .../core/fixture/MigrationJobAPIFixture.java       | 15 ++++-
 .../core/api/impl/MigrationJobAPIImplTest.java     |  5 +-
 ...lineProcessConfigurationPersistServiceTest.java | 66 ++++++++++++++++++++++
 19 files changed, 417 insertions(+), 103 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index 3cd18b1864e..07e1d1f20ca 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -36,4 +36,26 @@ public final class YamlPipelineProcessConfiguration 
implements YamlConfiguration
     private YamlPipelineWriteConfiguration write;
     
     private YamlAlgorithmConfiguration streamChannel;
+    
+    /**
+     * Copy non-null fields from another.
+     *
+     * @param another another configuration
+     */
+    // TODO add unit test
+    public void copyNonNullFields(final YamlPipelineProcessConfiguration 
another) {
+        if (null == read) {
+            read = another.getRead();
+        } else {
+            read.copyNonNullFields(another.getRead());
+        }
+        if (null == write) {
+            write = another.getWrite();
+        } else {
+            write.copyNonNullFields(another.getWrite());
+        }
+        if (null == streamChannel) {
+            streamChannel = another.getStreamChannel();
+        }
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
index 4e13aabca34..7f392856597 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
@@ -64,4 +64,27 @@ public final class YamlPipelineReadConfiguration implements 
YamlConfiguration {
             shardingSize = DEFAULT_SHARDING_SIZE;
         }
     }
+    
+    /**
+     * Copy non-null fields from another.
+     *
+     * @param another another configuration
+     */
+    public void copyNonNullFields(final YamlPipelineReadConfiguration another) 
{
+        if (null == another) {
+            return;
+        }
+        if (null != another.getWorkerThread()) {
+            workerThread = another.getWorkerThread();
+        }
+        if (null != another.getBatchSize()) {
+            batchSize = another.getBatchSize();
+        }
+        if (null != another.getShardingSize()) {
+            shardingSize = another.getShardingSize();
+        }
+        if (null != another.getRateLimiter()) {
+            rateLimiter = another.getRateLimiter();
+        }
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
index fb5460ae6b6..ef0d488826c 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
@@ -57,4 +57,24 @@ public final class YamlPipelineWriteConfiguration implements 
YamlConfiguration {
             batchSize = DEFAULT_BATCH_SIZE;
         }
     }
+    
+    /**
+     * Copy non-null fields from another.
+     *
+     * @param another another configuration
+     */
+    public void copyNonNullFields(final YamlPipelineWriteConfiguration 
another) {
+        if (null == another) {
+            return;
+        }
+        if (null != another.getWorkerThread()) {
+            workerThread = another.workerThread;
+        }
+        if (null != another.getBatchSize()) {
+            batchSize = another.getBatchSize();
+        }
+        if (null != another.getRateLimiter()) {
+            rateLimiter = another.getRateLimiter();
+        }
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index f370af21eba..a31bd8aa037 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -114,14 +114,14 @@ public interface MigrationJobPublicAPI extends 
PipelineJobPublicAPI, RequiredSPI
     void reset(String jobId);
     
     /**
-     * Update migration source resource.
+     * Add migration source resources.
      *
-     * @param sourcePropertiesMap source properties map
+     * @param dataSourcePropsMap data source properties map
      */
-    void addMigrationSourceResources(Map<String, DataSourceProperties> 
sourcePropertiesMap);
+    void addMigrationSourceResources(Map<String, DataSourceProperties> 
dataSourcePropsMap);
     
     /**
-     * Drop migration source resource.
+     * Drop migration source resources.
      *
      * @param resourceNames resource names
      */
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 13c9aa5de7e..1d7903a9a24 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 import java.util.List;
@@ -27,6 +28,27 @@ import java.util.List;
  */
 public interface PipelineJobPublicAPI extends TypedSPI {
     
+    /**
+     * Create process configuration.
+     *
+     * @param processConfig process configuration
+     */
+    void createProcessConfiguration(PipelineProcessConfiguration 
processConfig);
+    
+    /**
+     * Alter process configuration.
+     *
+     * @param processConfig process configuration
+     */
+    void alterProcessConfiguration(PipelineProcessConfiguration processConfig);
+    
+    /**
+     * Show process configuration.
+     *
+     * @return process configuration
+     */
+    PipelineProcessConfiguration showProcessConfiguration();
+    
     /**
      * Start disabled job.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index a1181ada319..d745f71b1e3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -110,18 +110,34 @@ public interface GovernanceRepositoryAPI {
     List<Integer> getShardingItems(String jobId);
     
     /**
-     * Get migration source data source.
+     * Get meta data data sources.
      *
      * @param jobType job type
-     * @return migration source data source
+     * @return data source properties
      */
-    String getMetaDataDataSource(JobType jobType);
+    String getMetaDataDataSources(JobType jobType);
     
     /**
-     * Persist meta data data source.
+     * Persist meta data data sources.
      *
      * @param jobType job type
-     * @param metaDataDataSource meta data data source
+     * @param metaDataDataSources data source properties
      */
-    void persistMetaDataDataSource(JobType jobType, String metaDataDataSource);
+    void persistMetaDataDataSources(JobType jobType, String 
metaDataDataSources);
+    
+    /**
+     * Get meta data process configuration.
+     *
+     * @param jobType job type, nullable
+     * @return process configuration YAML text
+     */
+    String getMetaDataProcessConfiguration(JobType jobType);
+    
+    /**
+     * Persist meta data process configuration.
+     *
+     * @param jobType job type, nullable
+     * @param processConfigYamlText process configuration YAML text
+     */
+    void persistMetaDataProcessConfiguration(JobType jobType, String 
processConfigYamlText);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
similarity index 64%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
index 81d95798cc1..517fbe08fed 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
@@ -18,28 +18,27 @@
 package org.apache.shardingsphere.data.pipeline.core.api;
 
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-
-import java.util.Map;
 
 /**
- * Pipeline resource API.
+ * Pipeline meta data persist service.
+ *
+ * @param <T> type of configuration
  */
-public interface PipelineResourceAPI {
+public interface PipelineMetaDataPersistService<T> {
     
     /**
-     * Get meta data data source.
+     * Load meta data.
      *
-     * @param jobType job type
-     * @return meta data data source
+     * @param jobType job type, nullable
+     * @return configurations
      */
-    Map<String, DataSourceProperties> getMetaDataDataSource(JobType jobType);
+    T load(JobType jobType);
     
     /**
-     * Persist meta data data source.
+     * Persist meta data.
      *
-     * @param jobType job type
-     * @param dataSource data source
+     * @param jobType job type, nullable
+     * @param configs configurations
      */
-    void persistMetaDataDataSource(JobType jobType, Map<String, 
DataSourceProperties> dataSource);
+    void persist(JobType jobType, T configs);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index f2f6a3cb583..d1a3b38da4e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -18,12 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Preconditions;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -32,20 +30,28 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineMetaDataException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Abstract pipeline job API impl.
@@ -55,8 +61,41 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
+    private static final YamlPipelineProcessConfigurationSwapper 
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    
+    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+    
     private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
     
+    protected abstract JobType getJobType();
+    
+    @Override
+    public void createProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
+        PipelineProcessConfiguration existingProcessConfig = 
processConfigPersistService.load(getJobType());
+        if (null != existingProcessConfig) {
+            throw new PipelineMetaDataException("Process configuration already 
exists");
+        }
+        processConfigPersistService.persist(getJobType(), processConfig);
+    }
+    
+    @Override
+    public void alterProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
+        PipelineProcessConfiguration existingProcessConfig = 
processConfigPersistService.load(getJobType());
+        if (null == existingProcessConfig) {
+            throw new PipelineMetaDataException("Process configuration does 
not exists");
+        }
+        YamlPipelineProcessConfiguration targetYamlProcessConfig = 
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+        
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+        processConfigPersistService.persist(getJobType(), 
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+    }
+    
+    @Override
+    public PipelineProcessConfiguration showProcessConfiguration() {
+        PipelineProcessConfiguration result = 
processConfigPersistService.load(getJobType());
+        result = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
+        return result;
+    }
+    
     @Override
     public final String marshalJobId(final PipelineJobId pipelineJobId) {
         return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + 
marshalJobIdLeftPart(pipelineJobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 09ed414e51a..a6dbbd1cc5b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -95,12 +95,22 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     }
     
     @Override
-    public String getMetaDataDataSource(final JobType jobType) {
+    public String getMetaDataDataSources(final JobType jobType) {
         return 
repository.get(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
     }
     
     @Override
-    public void persistMetaDataDataSource(final JobType jobType, final String 
metaDataDataSource) {
-        
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), 
metaDataDataSource);
+    public void persistMetaDataDataSources(final JobType jobType, final String 
metaDataDataSources) {
+        
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), 
metaDataDataSources);
+    }
+    
+    @Override
+    public String getMetaDataProcessConfiguration(final JobType jobType) {
+        return 
repository.get(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
+    }
+    
+    @Override
+    public void persistMetaDataProcessConfiguration(final JobType jobType, 
final String processConfigYamlText) {
+        
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), 
processConfigYamlText);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
similarity index 81%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
index 00bc24ecbef..4ae5a9a8c44 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -31,16 +31,16 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * Pipeline resource API implementation.
+ * Pipeline data source persist service.
  */
-public final class PipelineResourceAPIImpl implements PipelineResourceAPI {
+public final class PipelineDataSourcePersistService implements 
PipelineMetaDataPersistService<Map<String, DataSourceProperties>> {
     
     private static final YamlDataSourceConfigurationSwapper 
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
     
     @Override
     @SuppressWarnings("unchecked")
-    public Map<String, DataSourceProperties> getMetaDataDataSource(final 
JobType jobType) {
-        String dataSourcesProperties = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSource(jobType);
+    public Map<String, DataSourceProperties> load(final JobType jobType) {
+        String dataSourcesProperties = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSources(jobType);
         if (StringUtils.isBlank(dataSourcesProperties)) {
             return Collections.emptyMap();
         }
@@ -51,11 +51,11 @@ public final class PipelineResourceAPIImpl implements 
PipelineResourceAPI {
     }
     
     @Override
-    public void persistMetaDataDataSource(final JobType jobType, final 
Map<String, DataSourceProperties> dataSourceConfigs) {
-        Map<String, Map<String, Object>> dataSourceMap = new 
LinkedHashMap<>(dataSourceConfigs.size());
-        for (Entry<String, DataSourceProperties> entry : 
dataSourceConfigs.entrySet()) {
+    public void persist(final JobType jobType, final Map<String, 
DataSourceProperties> dataSourcePropsMap) {
+        Map<String, Map<String, Object>> dataSourceMap = new 
LinkedHashMap<>(dataSourcePropsMap.size());
+        for (Entry<String, DataSourceProperties> entry : 
dataSourcePropsMap.entrySet()) {
             dataSourceMap.put(entry.getKey(), 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(entry.getValue()));
         }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSource(jobType,
 YamlEngine.marshal(dataSourceMap));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSources(jobType,
 YamlEngine.marshal(dataSourceMap));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
new file mode 100644
index 00000000000..e57e08c89dd
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
+
+/**
+ * Pipeline process configuration persist service.
+ */
+public final class PipelineProcessConfigurationPersistService implements 
PipelineMetaDataPersistService<PipelineProcessConfiguration> {
+    
+    private static final YamlPipelineProcessConfigurationSwapper 
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    
+    @Override
+    public PipelineProcessConfiguration load(final JobType jobType) {
+        String yamlText = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataProcessConfiguration(jobType);
+        if (StringUtils.isBlank(yamlText)) {
+            return null;
+        }
+        return 
PROCESS_CONFIG_SWAPPER.swapToObject(YamlEngine.unmarshal(yamlText, 
YamlPipelineProcessConfiguration.class, true));
+    }
+    
+    @Override
+    public void persist(final JobType jobType, final 
PipelineProcessConfiguration processConfig) {
+        String yamlText = 
YamlEngine.marshal(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataProcessConfiguration(jobType,
 yamlText);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index 774ca9e6145..7079eeb5c9d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -24,22 +24,15 @@ import 
org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
-
-import java.util.Properties;
 
 /**
  * Abstract pipeline process context.
@@ -48,8 +41,6 @@ import java.util.Properties;
 @Slf4j
 public abstract class AbstractPipelineProcessContext implements 
PipelineProcessContext {
     
-    private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
-    
     private final PipelineProcessConfiguration pipelineProcessConfig;
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
@@ -65,7 +56,7 @@ public abstract class AbstractPipelineProcessContext 
implements PipelineProcessC
     private final LazyInitializer<ExecuteEngine> 
importerExecuteEngineLazyInitializer;
     
     public AbstractPipelineProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
-        PipelineProcessConfiguration processConfig = 
convertProcessConfig(originalProcessConfig);
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
         this.pipelineProcessConfig = processConfig;
         PipelineReadConfiguration readConfig = processConfig.getRead();
         AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
@@ -98,24 +89,6 @@ public abstract class AbstractPipelineProcessContext 
implements PipelineProcessC
         };
     }
     
-    private PipelineProcessConfiguration convertProcessConfig(final 
PipelineProcessConfiguration originalProcessConfig) {
-        YamlPipelineProcessConfiguration yamlActionConfig = 
SWAPPER.swapToYamlConfiguration(originalProcessConfig);
-        if (null == yamlActionConfig.getRead()) {
-            
yamlActionConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
-        } else {
-            yamlActionConfig.getRead().fillInNullFieldsWithDefaultValue();
-        }
-        if (null == yamlActionConfig.getWrite()) {
-            
yamlActionConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
-        } else {
-            yamlActionConfig.getWrite().fillInNullFieldsWithDefaultValue();
-        }
-        if (null == yamlActionConfig.getStreamChannel()) {
-            yamlActionConfig.setStreamChannel(new 
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new 
Properties()));
-        }
-        return SWAPPER.swapToObject(yamlActionConfig);
-    }
-    
     /**
      * Get inventory dumper execute engine.
      *
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineMetaDataException.java
similarity index 57%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineMetaDataException.java
index 3cd18b1864e..88769d0b70b 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineMetaDataException.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+package org.apache.shardingsphere.data.pipeline.core.exception;
 
 /**
- * YAML pipeline process configuration.
+ * Pipeline meta data exception.
  */
-@Getter
-@Setter
-@ToString
-public final class YamlPipelineProcessConfiguration implements 
YamlConfiguration {
-    
-    private YamlPipelineReadConfiguration read;
+public final class PipelineMetaDataException extends RuntimeException {
     
-    private YamlPipelineWriteConfiguration write;
+    private static final long serialVersionUID = 1L;
     
-    private YamlAlgorithmConfiguration streamChannel;
+    public PipelineMetaDataException(final String message) {
+        super(message);
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 9bd4767593e..e4e36bd727d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -47,7 +47,11 @@ public final class PipelineMetaDataNode {
     }
     
     private static String getMetaDataRootPath(final JobType jobType) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobType.getLowercaseTypeName(), "metadata");
+        if (null != jobType) {
+            return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobType.getLowercaseTypeName(), "metadata");
+        } else {
+            return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
"metadata");
+        }
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
new file mode 100644
index 00000000000..b267cfea044
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
+
+import java.util.Properties;
+
+/**
+ * Pipeline process configuration utils.
+ */
+public final class PipelineProcessConfigurationUtils {
+    
+    private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
+    
+    /**
+     * Convert with default value.
+     *
+     * @param originalConfig original process configuration, nullable
+     * @return process configuration
+     */
+    public static PipelineProcessConfiguration convertWithDefaultValue(final 
PipelineProcessConfiguration originalConfig) {
+        if (null != originalConfig && null != originalConfig.getRead() && null 
!= originalConfig.getWrite() && null != originalConfig.getStreamChannel()) {
+            return originalConfig;
+        }
+        YamlPipelineProcessConfiguration yamlConfig = null != originalConfig ? 
SWAPPER.swapToYamlConfiguration(originalConfig) : new 
YamlPipelineProcessConfiguration();
+        if (null == yamlConfig.getRead()) {
+            
yamlConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
+        } else {
+            yamlConfig.getRead().fillInNullFieldsWithDefaultValue();
+        }
+        if (null == yamlConfig.getWrite()) {
+            
yamlConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
+        } else {
+            yamlConfig.getWrite().fillInNullFieldsWithDefaultValue();
+        }
+        if (null == yamlConfig.getStreamChannel()) {
+            yamlConfig.setStreamChannel(new 
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new 
Properties()));
+        }
+        return SWAPPER.swapToObject(yamlConfig);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 06ce962775c..aef8bb6501a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -40,10 +40,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
-import 
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineResourceAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -89,7 +88,12 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     private final PipelineJobItemAPI jobItemAPI = new 
InventoryIncrementalJobItemAPIImpl();
     
-    private final PipelineResourceAPI pipelineResourceAPI = new 
PipelineResourceAPIImpl();
+    private final PipelineDataSourcePersistService dataSourcePersistService = 
new PipelineDataSourcePersistService();
+    
+    @Override
+    protected JobType getJobType() {
+        return JobType.MIGRATION;
+    }
     
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -119,7 +123,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     private String generateJobId(final YamlMigrationJobConfiguration config) {
         MigrationJobId jobId = new MigrationJobId();
-        jobId.setTypeCode(JobType.MIGRATION.getTypeCode());
+        jobId.setTypeCode(getJobType().getTypeCode());
         jobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
         jobId.setCurrentMetadataVersion(config.getActiveVersion());
         jobId.setNewMetadataVersion(config.getNewVersion());
@@ -402,11 +406,11 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     }
     
     @Override
-    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> dataSourceProperties) {
-        log.info("Add migration source resources {}", 
dataSourceProperties.keySet());
-        Map<String, DataSourceProperties> existDataSources = 
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
-        Collection<String> duplicateDataSourceNames = new 
HashSet<>(dataSourceProperties.size(), 1);
-        for (Entry<String, DataSourceProperties> entry : 
dataSourceProperties.entrySet()) {
+    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> dataSourcePropsMap) {
+        log.info("Add migration source resources {}", 
dataSourcePropsMap.keySet());
+        Map<String, DataSourceProperties> existDataSources = 
dataSourcePersistService.load(getJobType());
+        Collection<String> duplicateDataSourceNames = new 
HashSet<>(dataSourcePropsMap.size(), 1);
+        for (Entry<String, DataSourceProperties> entry : 
dataSourcePropsMap.entrySet()) {
             if (existDataSources.containsKey(entry.getKey())) {
                 duplicateDataSourceNames.add(entry.getKey());
             }
@@ -415,13 +419,13 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
             throw new 
AddMigrationSourceResourceException(String.format("Duplicate resource names 
%s.", duplicateDataSourceNames));
         }
         Map<String, DataSourceProperties> result = new 
LinkedHashMap<>(existDataSources);
-        result.putAll(dataSourceProperties);
-        pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION, 
result);
+        result.putAll(dataSourcePropsMap);
+        dataSourcePersistService.persist(getJobType(), result);
     }
     
     @Override
     public void dropMigrationSourceResources(final Collection<String> 
resourceNames) {
-        Map<String, DataSourceProperties> metaDataDataSource = 
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+        Map<String, DataSourceProperties> metaDataDataSource = 
dataSourcePersistService.load(getJobType());
         List<String> noExistResources = resourceNames.stream().filter(each -> 
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
         if (!noExistResources.isEmpty()) {
             throw new 
DropMigrationSourceResourceException(String.format("Resource names %s not 
exist.", resourceNames));
@@ -429,11 +433,11 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         for (String each : resourceNames) {
             metaDataDataSource.remove(each);
         }
-        pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION, 
metaDataDataSource);
+        dataSourcePersistService.persist(getJobType(), metaDataDataSource);
     }
     
     @Override
     public String getType() {
-        return JobType.MIGRATION.getTypeName();
+        return getJobType().getTypeName();
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 81e59f59ed7..424ee393d5b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -50,6 +50,19 @@ public final class MigrationJobAPIFixture implements 
MigrationJobAPI {
     public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
     }
     
+    @Override
+    public void createProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
+    }
+    
+    @Override
+    public void alterProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
+    }
+    
+    @Override
+    public PipelineProcessConfiguration showProcessConfiguration() {
+        return null;
+    }
+    
     @Override
     public void startDisabledJob(final String jobId) {
     }
@@ -146,7 +159,7 @@ public final class MigrationJobAPIFixture implements 
MigrationJobAPI {
     }
     
     @Override
-    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> sourcePropertiesMap) {
+    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> dataSourcePropsMap) {
     }
     
     @Override
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index f56f0ce16bc..2b292de129b 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -31,7 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -278,8 +277,8 @@ public final class MigrationJobAPIImplTest {
         Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
         expect.put("ds_0", new 
DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props));
         jobAPI.addMigrationSourceResources(expect);
-        PipelineResourceAPI pipelineResourceAPI = new 
PipelineResourceAPIImpl();
-        Map<String, DataSourceProperties> actual = 
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+        PipelineDataSourcePersistService persistService = new 
PipelineDataSourcePersistService();
+        Map<String, DataSourceProperties> actual = 
persistService.load(JobType.MIGRATION);
         assertTrue(actual.containsKey("ds_0"));
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
new file mode 100644
index 00000000000..c959637a971
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineProcessConfigurationPersistServiceTest {
+    
+    private static final YamlPipelineProcessConfigurationSwapper 
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    
+    @BeforeClass
+    public static void beforeClass() {
+        PipelineContextUtil.mockModeConfigAndContextManager();
+    }
+    
+    @Test
+    public void assertLoadAndPersist() {
+        YamlPipelineProcessConfiguration yamlProcessConfig = new 
YamlPipelineProcessConfiguration();
+        YamlPipelineReadConfiguration yamlReadConfig = 
YamlPipelineReadConfiguration.buildWithDefaultValue();
+        yamlReadConfig.fillInNullFieldsWithDefaultValue();
+        yamlReadConfig.setShardingSize(10);
+        yamlProcessConfig.setRead(yamlReadConfig);
+        YamlPipelineWriteConfiguration yamlWriteConfig = 
YamlPipelineWriteConfiguration.buildWithDefaultValue();
+        yamlProcessConfig.setWrite(yamlWriteConfig);
+        YamlAlgorithmConfiguration yamlStreamChannel = new 
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new Properties());
+        yamlProcessConfig.setStreamChannel(yamlStreamChannel);
+        String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
+        PipelineProcessConfiguration processConfig = 
PROCESS_CONFIG_SWAPPER.swapToObject(yamlProcessConfig);
+        PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
+        JobType jobType = JobType.MIGRATION;
+        persistService.persist(jobType, processConfig);
+        String actualYamlText = 
YamlEngine.marshal(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(persistService.load(jobType)));
+        assertThat(actualYamlText, is(expectedYamlText));
+    }
+}

Reply via email to