This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new faf380b73a2 Refactor GovernanceRepositoryAPI (#29129)
faf380b73a2 is described below

commit faf380b73a280fcf94160d74cebcd3fd8246a5d3
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 12:11:49 2023 +0800

    Refactor GovernanceRepositoryAPI (#29129)
    
    * Add PipelineJobConfigurationGovernanceRepository
    
    * Add PipelineJobOffsetGovernanceRepository
    
    * Add PipelineJobItemProcessGovernanceRepository
    
    * Add PipelineJobItemErrorMessageGovernanceRepository
    
    * Add PipelineJobCheckGovernanceRepository
    
    * Add PipelineJobGovernanceRepository
    
    * Refactor PipelineJobOffsetGovernanceRepository
    
    * Add PipelineMetaDataDataSourceGovernanceRepository
    
    * Add PipelineMetaDataProcessConfigurationGovernanceRepository
    
    * Add PipelineMetaDataProcessConfigurationGovernanceRepository
    
    * Fix test case
    
    * Fix test case
---
 .../repository/GovernanceRepositoryAPI.java        | 206 ++++-----------------
 .../repository/GovernanceRepositoryAPIImpl.java    | 176 +++---------------
 .../PipelineJobCheckGovernanceRepository.java      | 134 ++++++++++++++
 ...pelineJobConfigurationGovernanceRepository.java |  53 ++++++
 .../PipelineJobGovernanceRepository.java           |  51 +++++
 ...ineJobItemErrorMessageGovernanceRepository.java |  53 ++++++
 ...PipelineJobItemProcessGovernanceRepository.java |  68 +++++++
 .../PipelineJobOffsetGovernanceRepository.java     |  71 +++++++
 ...lineMetaDataDataSourceGovernanceRepository.java |  51 +++++
 ...taProcessConfigurationGovernanceRepository.java |  51 +++++
 .../service/PipelineJobIteErrorMessageManager.java |   6 +-
 .../core/job/service/PipelineJobItemManager.java   |  10 +-
 .../core/job/service/PipelineJobManager.java       |  12 +-
 .../metadata/PipelineDataSourcePersistService.java |   4 +-
 ...PipelineProcessConfigurationPersistService.java |   4 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |   8 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |  24 +--
 .../task/ConsistencyCheckTasksRunner.java          |   3 +-
 .../migration/api/impl/MigrationJobAPI.java        |   2 +-
 .../migration/prepare/MigrationJobPreparer.java    |   4 +-
 .../service/GovernanceRepositoryAPIImplTest.java   | 113 ++++++-----
 .../consistencycheck/ConsistencyCheckJobTest.java  |   2 +-
 .../api/impl/ConsistencyCheckJobAPITest.java       |   8 +-
 .../migration/api/impl/MigrationJobAPITest.java    |   4 +-
 .../MigrationDataConsistencyCheckerTest.java       |   2 +-
 25 files changed, 688 insertions(+), 432 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index 87496931229..930d24226b0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,215 +17,73 @@
 
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
 
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
 /**
  * Governance repository API.
  */
 public interface GovernanceRepositoryAPI {
     
     /**
-     * Watch pipeLine root path.
-     *
-     * @param listener data changed event listener
-     */
-    void watchPipeLineRootPath(DataChangedEventListener listener);
-    
-    /**
-     * Whether job configuration existed.
-     *
-     * @param jobId jobId
-     * @return job configuration exist or not
-     */
-    boolean isJobConfigurationExisted(String jobId);
-    
-    /**
-     * Persist job offset info.
-     *
-     * @param jobId job id
-     * @param jobOffsetInfo job offset info
-     */
-    void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
-    
-    /**
-     * Get job offset info.
-     *
-     * @param jobId job id
-     * @return job offset info
-     */
-    JobOffsetInfo getJobOffsetInfo(String jobId);
-    
-    /**
-     * Persist job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param progressValue progress value
-     */
-    void persistJobItemProgress(String jobId, int shardingItem, String 
progressValue);
-    
-    /**
-     * Update job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param progressValue progress value
-     */
-    void updateJobItemProgress(String jobId, int shardingItem, String 
progressValue);
-    
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return job item progress
-     */
-    Optional<String> getJobItemProgress(String jobId, int shardingItem);
-    
-    /**
-     * Get latest check job id.
-     *
-     * @param parentJobId parent job id
-     * @return check job id
-     */
-    Optional<String> getLatestCheckJobId(String parentJobId);
-    
-    /**
-     * Persist latest check job id.
-     *
-     * @param parentJobId job id
-     * @param checkJobId check job id
-     */
-    void persistLatestCheckJobId(String parentJobId, String checkJobId);
-    
-    /**
-     * Delete latest check job id.
-     *
-     * @param parentJobId parent job id
-     */
-    void deleteLatestCheckJobId(String parentJobId);
-    
-    /**
-     * Get check job result.
-     *
-     * @param parentJobId parent job id
-     * @param checkJobId check job id
-     * @return check job result
-     */
-    Map<String, TableDataConsistencyCheckResult> getCheckJobResult(String 
parentJobId, String checkJobId);
-    
-    /**
-     * Persist check job result.
-     *
-     * @param parentJobId parent job id
-     * @param checkJobId check job id
-     * @param checkResultMap check result map
-     */
-    void persistCheckJobResult(String parentJobId, String checkJobId, 
Map<String, TableDataConsistencyCheckResult> checkResultMap);
-    
-    /**
-     * Delete check job result.
-     *
-     * @param parentJobId parent job id
-     * @param checkJobId check job id
-     */
-    void deleteCheckJobResult(String parentJobId, String checkJobId);
-    
-    /**
-     * List check job ids.
-     *
-     * @param parentJobId parent job id
-     * @return check job ids
-     */
-    Collection<String> listCheckJobIds(String parentJobId);
-    
-    /**
-     * Delete job.
-     *
-     * @param jobId job id
-     */
-    void deleteJob(String jobId);
-    
-    /**
-     * Persist job root info.
-     *
-     * @param jobId job ID
-     * @param jobClass job class
+     * Get job configuration governance repository.
+     * 
+     * @return job configuration governance repository
      */
-    void persistJobRootInfo(String jobId, Class<? extends PipelineJob> 
jobClass);
+    PipelineJobConfigurationGovernanceRepository 
getJobConfigurationGovernanceRepository();
     
     /**
-     * Persist job configuration.
+     * Get job offset governance repository.
      * 
-     * @param jobId job ID
-     * @param jobConfigPOJO job configuration POJO
+     * @return job offset governance repository
      */
-    void persistJobConfiguration(String jobId, JobConfigurationPOJO 
jobConfigPOJO);
+    PipelineJobOffsetGovernanceRepository getJobOffsetGovernanceRepository();
     
     /**
-     * Update job item error message.
+     * Get job item process governance repository.
      *
-     * @param jobId job ID
-     * @param shardingItem sharding item
-     * @param errorMessage error message
+     * @return job item process governance repository
      */
-    void updateJobItemErrorMessage(String jobId, int shardingItem, String 
errorMessage);
+    PipelineJobItemProcessGovernanceRepository 
getJobItemProcessGovernanceRepository();
     
     /**
-     * Get sharding items of job.
-     *
-     * @param jobId job id
-     * @return sharding items
+     * Get job item error message governance repository.
+     * 
+     * @return job item error message governance repository
      */
-    List<Integer> getShardingItems(String jobId);
+    PipelineJobItemErrorMessageGovernanceRepository 
getJobItemErrorMessageGovernanceRepository();
     
     /**
-     * Get meta data data sources.
-     *
-     * @param jobType job type
-     * @return data source properties
+     * Get job check governance repository.
+     * 
+     * @return job check governance repository
      */
-    String getMetaDataDataSources(String jobType);
+    PipelineJobCheckGovernanceRepository getJobCheckGovernanceRepository();
     
     /**
-     * Persist meta data data sources.
-     *
-     * @param jobType job type
-     * @param metaDataDataSources data source properties
+     * Get job governance repository.
+     * 
+     * @return job governance repository
      */
-    void persistMetaDataDataSources(String jobType, String 
metaDataDataSources);
+    PipelineJobGovernanceRepository getJobGovernanceRepository();
     
     /**
-     * Get meta data process configuration.
+     * Get meta data data source governance repository.
      *
-     * @param jobType job type, nullable
-     * @return process configuration YAML text
+     * @return meta data data source governance repository
      */
-    String getMetaDataProcessConfiguration(String jobType);
+    PipelineMetaDataDataSourceGovernanceRepository 
getMetaDataDataSourceGovernanceRepository();
     
     /**
-     * Persist meta data process configuration.
-     *
-     * @param jobType job type, nullable
-     * @param processConfigYamlText process configuration YAML text
+     * Get meta data process configuration governance repository.
+     * 
+     * @return meta data process configuration governance repository
      */
-    void persistMetaDataProcessConfiguration(String jobType, String 
processConfigYamlText);
+    PipelineMetaDataProcessConfigurationGovernanceRepository 
getMetaDataProcessConfigurationGovernanceRepository();
     
     /**
-     * Get job item error msg.
+     * Watch pipeLine root path.
      *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return error msg
+     * @param listener data changed event listener
      */
-    String getJobItemErrorMessage(String jobId, int shardingItem);
+    void watchPipeLineRootPath(DataChangedEventListener listener);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 1ea1dee6c94..2605c893bfb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -17,181 +17,49 @@
 
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
 
-import com.google.common.base.Strings;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
 /**
  * Governance repository API impl.
  */
-@RequiredArgsConstructor
-@Slf4j
+@Getter
 public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAPI {
     
     private final ClusterPersistRepository repository;
     
-    @Override
-    public void watchPipeLineRootPath(final DataChangedEventListener listener) 
{
-        repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
-    }
-    
-    @Override
-    public boolean isJobConfigurationExisted(final String jobId) {
-        return null != 
repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
-    }
-    
-    @Override
-    public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
-        repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId), 
YamlEngine.marshal(new 
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)));
-    }
-    
-    @Override
-    public JobOffsetInfo getJobOffsetInfo(final String jobId) {
-        String value = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
-        return new 
YamlJobOffsetInfoSwapper().swapToObject(Strings.isNullOrEmpty(value) ? new 
YamlJobOffsetInfo() : YamlEngine.unmarshal(value, YamlJobOffsetInfo.class));
-    }
-    
-    @Override
-    public void persistJobItemProgress(final String jobId, final int 
shardingItem, final String progressValue) {
-        repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
-    }
-    
-    @Override
-    public void updateJobItemProgress(final String jobId, final int 
shardingItem, final String progressValue) {
-        repository.update(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
-    }
-    
-    @Override
-    public Optional<String> getJobItemProgress(final String jobId, final int 
shardingItem) {
-        String text = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem));
-        return Strings.isNullOrEmpty(text) ? Optional.empty() : 
Optional.of(text);
-    }
-    
-    @Override
-    public Optional<String> getLatestCheckJobId(final String parentJobId) {
-        return 
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId)));
-    }
-    
-    @Override
-    public void persistLatestCheckJobId(final String parentJobId, final String 
checkJobId) {
-        
repository.persist(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId), 
String.valueOf(checkJobId));
-    }
-    
-    @Override
-    public void deleteLatestCheckJobId(final String parentJobId) {
-        
repository.delete(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId));
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public Map<String, TableDataConsistencyCheckResult> 
getCheckJobResult(final String parentJobId, final String checkJobId) {
-        String yamlCheckResultMapText = 
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, 
checkJobId));
-        if (Strings.isNullOrEmpty(yamlCheckResultMapText)) {
-            return Collections.emptyMap();
-        }
-        YamlTableDataConsistencyCheckResultSwapper swapper = new 
YamlTableDataConsistencyCheckResultSwapper();
-        Map<String, String> yamlCheckResultMap = 
YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
-        Map<String, TableDataConsistencyCheckResult> result = new 
HashMap<>(yamlCheckResultMap.size(), 1F);
-        for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
-            result.put(entry.getKey(), swapper.swapToObject(entry.getValue()));
-        }
-        return result;
-    }
-    
-    @Override
-    public void persistCheckJobResult(final String parentJobId, final String 
checkJobId, final Map<String, TableDataConsistencyCheckResult> checkResultMap) {
-        if (null == checkResultMap) {
-            return;
-        }
-        Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
-        for (Entry<String, TableDataConsistencyCheckResult> entry : 
checkResultMap.entrySet()) {
-            YamlTableDataConsistencyCheckResult yamlCheckResult = new 
YamlTableDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
-            yamlCheckResultMap.put(entry.getKey(), 
YamlEngine.marshal(yamlCheckResult));
-        }
-        
repository.persist(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, 
checkJobId), YamlEngine.marshal(yamlCheckResultMap));
-    }
-    
-    @Override
-    public void deleteCheckJobResult(final String parentJobId, final String 
checkJobId) {
-        
repository.delete(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, 
checkJobId));
-    }
-    
-    @Override
-    public Collection<String> listCheckJobIds(final String parentJobId) {
-        return 
repository.getChildrenKeys(PipelineMetaDataNode.getCheckJobIdsRootPath(parentJobId));
-    }
-    
-    @Override
-    public void deleteJob(final String jobId) {
-        repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
-    }
+    private final PipelineJobConfigurationGovernanceRepository 
jobConfigurationGovernanceRepository;
     
-    @Override
-    public void persistJobRootInfo(final String jobId, final Class<? extends 
PipelineJob> jobClass) {
-        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobClass.getName());
-    }
+    private final PipelineJobOffsetGovernanceRepository 
jobOffsetGovernanceRepository;
     
-    @Override
-    public void persistJobConfiguration(final String jobId, final 
JobConfigurationPOJO jobConfigPOJO) {
-        
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), 
YamlEngine.marshal(jobConfigPOJO));
-    }
+    private final PipelineJobItemProcessGovernanceRepository 
jobItemProcessGovernanceRepository;
     
-    @Override
-    public void updateJobItemErrorMessage(final String jobId, final int 
shardingItem, final String errorMessage) {
-        
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem), errorMessage);
-    }
+    private final PipelineJobItemErrorMessageGovernanceRepository 
jobItemErrorMessageGovernanceRepository;
     
-    @Override
-    public List<Integer> getShardingItems(final String jobId) {
-        List<String> result = 
repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
-        return 
result.stream().map(Integer::parseInt).collect(Collectors.toList());
-    }
+    private final PipelineJobCheckGovernanceRepository 
jobCheckGovernanceRepository;
     
-    @Override
-    public String getMetaDataDataSources(final String jobType) {
-        return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
-    }
+    private final PipelineJobGovernanceRepository jobGovernanceRepository;
     
-    @Override
-    public void persistMetaDataDataSources(final String jobType, final String 
metaDataDataSources) {
-        
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), 
metaDataDataSources);
-    }
+    private final PipelineMetaDataDataSourceGovernanceRepository 
metaDataDataSourceGovernanceRepository;
     
-    @Override
-    public String getMetaDataProcessConfiguration(final String jobType) {
-        return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
-    }
+    private final PipelineMetaDataProcessConfigurationGovernanceRepository 
metaDataProcessConfigurationGovernanceRepository;
     
-    @Override
-    public void persistMetaDataProcessConfiguration(final String jobType, 
final String processConfigYamlText) {
-        
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), 
processConfigYamlText);
+    public GovernanceRepositoryAPIImpl(final ClusterPersistRepository 
repository) {
+        this.repository = repository;
+        jobConfigurationGovernanceRepository = new 
PipelineJobConfigurationGovernanceRepository(repository);
+        jobOffsetGovernanceRepository = new 
PipelineJobOffsetGovernanceRepository(repository);
+        jobItemProcessGovernanceRepository = new 
PipelineJobItemProcessGovernanceRepository(repository);
+        jobItemErrorMessageGovernanceRepository = new 
PipelineJobItemErrorMessageGovernanceRepository(repository);
+        jobCheckGovernanceRepository = new 
PipelineJobCheckGovernanceRepository(repository);
+        jobGovernanceRepository = new 
PipelineJobGovernanceRepository(repository);
+        metaDataDataSourceGovernanceRepository = new 
PipelineMetaDataDataSourceGovernanceRepository(repository);
+        metaDataProcessConfigurationGovernanceRepository = new 
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
     }
     
     @Override
-    public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        return 
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
+    public void watchPipeLineRootPath(final DataChangedEventListener listener) 
{
+        repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
new file mode 100644
index 00000000000..e5c44dfdd3c
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+/**
+ * Pipeline job check governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobCheckGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Get latest check job id.
+     *
+     * @param parentJobId parent job id
+     * @return check job id
+     */
+    public Optional<String> getLatestCheckJobId(final String parentJobId) {
+        return 
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId)));
+    }
+    
+    /**
+     * Persist latest check job id.
+     *
+     * @param parentJobId job id
+     * @param checkJobId check job id
+     */
+    public void persistLatestCheckJobId(final String parentJobId, final String 
checkJobId) {
+        
repository.persist(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId), 
String.valueOf(checkJobId));
+    }
+    
+    /**
+     * Delete latest check job id.
+     *
+     * @param parentJobId parent job id
+     */
+    public void deleteLatestCheckJobId(final String parentJobId) {
+        
repository.delete(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId));
+    }
+    
+    /**
+     * Get check job result.
+     *
+     * @param parentJobId parent job id
+     * @param checkJobId check job id
+     * @return check job result
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, TableDataConsistencyCheckResult> 
getCheckJobResult(final String parentJobId, final String checkJobId) {
+        String yamlCheckResultMapText = 
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, 
checkJobId));
+        if (Strings.isNullOrEmpty(yamlCheckResultMapText)) {
+            return Collections.emptyMap();
+        }
+        YamlTableDataConsistencyCheckResultSwapper swapper = new 
YamlTableDataConsistencyCheckResultSwapper();
+        Map<String, String> yamlCheckResultMap = 
YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
+        Map<String, TableDataConsistencyCheckResult> result = new 
HashMap<>(yamlCheckResultMap.size(), 1F);
+        for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
+            result.put(entry.getKey(), swapper.swapToObject(entry.getValue()));
+        }
+        return result;
+    }
+    
+    /**
+     * Persist check job result.
+     *
+     * @param parentJobId parent job id
+     * @param checkJobId check job id
+     * @param checkResultMap check result map
+     */
+    public void persistCheckJobResult(final String parentJobId, final String 
checkJobId, final Map<String, TableDataConsistencyCheckResult> checkResultMap) {
+        if (null == checkResultMap) {
+            return;
+        }
+        Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
+        for (Entry<String, TableDataConsistencyCheckResult> entry : 
checkResultMap.entrySet()) {
+            YamlTableDataConsistencyCheckResult yamlCheckResult = new 
YamlTableDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+            yamlCheckResultMap.put(entry.getKey(), 
YamlEngine.marshal(yamlCheckResult));
+        }
+        
repository.persist(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, 
checkJobId), YamlEngine.marshal(yamlCheckResultMap));
+    }
+    
+    /**
+     * Delete check job result.
+     *
+     * @param parentJobId parent job id
+     * @param checkJobId check job id
+     */
+    public void deleteCheckJobResult(final String parentJobId, final String 
checkJobId) {
+        
repository.delete(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, 
checkJobId));
+    }
+    
+    /**
+     * List check job ids.
+     *
+     * @param parentJobId parent job id
+     * @return check job ids
+     */
+    public Collection<String> listCheckJobIds(final String parentJobId) {
+        return 
repository.getChildrenKeys(PipelineMetaDataNode.getCheckJobIdsRootPath(parentJobId));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
new file mode 100644
index 00000000000..9b6d13bf983
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline job configuration governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobConfigurationGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Whether pipeline job configuration existed.
+     *
+     * @param jobId jobId
+     * @return pipeline job configuration exists or not
+     */
+    public boolean isExisted(final String jobId) {
+        return null != 
repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
+    }
+    
+    /**
+     * Persist pipeline job configuration.
+     * 
+     * @param jobId job ID
+     * @param jobConfigPOJO job configuration POJO
+     */
+    public void persist(final String jobId, final JobConfigurationPOJO 
jobConfigPOJO) {
+        
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), 
YamlEngine.marshal(jobConfigPOJO));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
new file mode 100644
index 00000000000..a403f0f7b0e
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline job governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Create job.
+     *
+     * @param jobId job ID
+     * @param jobClass job class
+     */
+    public void create(final String jobId, final Class<? extends PipelineJob> 
jobClass) {
+        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobClass.getName());
+    }
+    
+    /**
+     * Delete job.
+     *
+     * @param jobId job id
+     */
+    public void delete(final String jobId) {
+        repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
new file mode 100644
index 00000000000..eee1bd77b57
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline job item error message governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobItemErrorMessageGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Update job item error message.
+     *
+     * @param jobId job ID
+     * @param shardingItem sharding item
+     * @param errorMessage error message
+     */
+    public void update(final String jobId, final int shardingItem, final 
String errorMessage) {
+        
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem), errorMessage);
+    }
+    
+    /**
+     * Load job item error msg.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return error msg
+     */
+    public String load(final String jobId, final int shardingItem) {
+        return 
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
new file mode 100644
index 00000000000..7ea4887fdb8
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Optional;
+
+/**
+ * Pipeline job item process governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobItemProcessGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Persist job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param progressValue progress value
+     */
+    public void persist(final String jobId, final int shardingItem, final 
String progressValue) {
+        repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
+    }
+    
+    /**
+     * Update job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param progressValue progress value
+     */
+    public void update(final String jobId, final int shardingItem, final 
String progressValue) {
+        repository.update(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
+    }
+    
+    /**
+     * Load job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job item progress
+     */
+    public Optional<String> load(final String jobId, final int shardingItem) {
+        String text = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem));
+        return Strings.isNullOrEmpty(text) ? Optional.empty() : 
Optional.of(text);
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
new file mode 100644
index 00000000000..412f78e4e6e
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline job offset governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobOffsetGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Persist job offset info.
+     *
+     * @param jobId job id
+     * @param jobOffsetInfo job offset info
+     */
+    public void persist(final String jobId, final JobOffsetInfo jobOffsetInfo) 
{
+        repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId), 
YamlEngine.marshal(new 
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)));
+    }
+    
+    /**
+     * Load job offset info.
+     *
+     * @param jobId job id
+     * @return job offset info
+     */
+    public JobOffsetInfo load(final String jobId) {
+        String value = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
+        return new 
YamlJobOffsetInfoSwapper().swapToObject(Strings.isNullOrEmpty(value) ? new 
YamlJobOffsetInfo() : YamlEngine.unmarshal(value, YamlJobOffsetInfo.class));
+    }
+    
+    /**
+     * Get sharding items of job.
+     *
+     * @param jobId job id
+     * @return sharding items
+     */
+    public List<Integer> getShardingItems(final String jobId) {
+        List<String> result = 
repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
+        return 
result.stream().map(Integer::parseInt).collect(Collectors.toList());
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
new file mode 100644
index 00000000000..870cc0c20dc
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline meta data data source governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineMetaDataDataSourceGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Persist meta data data sources.
+     *
+     * @param jobType job type
+     * @param metaDataDataSources data source properties
+     */
+    public void persist(final String jobType, final String 
metaDataDataSources) {
+        
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), 
metaDataDataSources);
+    }
+    
+    /**
+     * Load meta data data sources.
+     *
+     * @param jobType job type
+     * @return data source properties
+     */
+    public String load(final String jobType) {
+        return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
new file mode 100644
index 00000000000..e6f3e1ffb58
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline meta data process configuration governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineMetaDataProcessConfigurationGovernanceRepository {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Persist meta data process configuration.
+     *
+     * @param jobType job type, nullable
+     * @param processConfigYamlText process configuration YAML text
+     */
+    public void persist(final String jobType, final String 
processConfigYamlText) {
+        
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), 
processConfigYamlText);
+    }
+    
+    /**
+     * Load meta data process configuration.
+     *
+     * @param jobType job type, nullable
+     * @return process configuration YAML text
+     */
+    public String load(final String jobType) {
+        return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index 669d7eebbfe..879b241e58e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -46,7 +46,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @return map, key is sharding item, value is error message
      */
     public String getErrorMessage() {
-        return 
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId, 
shardingItem)).orElse("");
+        return 
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().load(jobId,
 shardingItem)).orElse("");
     }
     
     /**
@@ -55,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @param error error
      */
     public void updateErrorMessage(final Object error) {
-        governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, 
null == error ? "" : buildErrorMessage(error));
+        
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
 shardingItem, null == error ? "" : buildErrorMessage(error));
     }
     
     private String buildErrorMessage(final Object error) {
@@ -66,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
      * Clean job item error message.
      */
     public void cleanErrorMessage() {
-        governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, 
"");
+        
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
 shardingItem, "");
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index c6e6ec47e5a..78a880c68c9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -54,8 +54,8 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
             return;
         }
         jobItemProgress.get().setStatus(status);
-        PipelineAPIFactory.getGovernanceRepositoryAPI(
-                
PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, 
shardingItem, 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId))
+                .getJobItemProcessGovernanceRepository().update(jobId, 
shardingItem, 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
     /**
@@ -66,7 +66,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      * @return job item progress
      */
     public Optional<T> getProgress(final String jobId, final int shardingItem) 
{
-        return 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem)
+        return 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
 shardingItem)
                 .map(optional -> 
swapper.swapToObject(YamlEngine.unmarshal(optional, 
swapper.getYamlProgressClass(), true)));
     }
     
@@ -77,7 +77,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      */
     public void persistProgress(final PipelineJobItemContext jobItemContext) {
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+                
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
     }
     
     /**
@@ -87,7 +87,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      */
     public void updateProgress(final PipelineJobItemContext jobItemContext) {
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+                
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
     }
     
     @SuppressWarnings("unchecked")
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 9dfa1c4f62a..a3a89ccf6e6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -75,12 +75,12 @@ public final class PipelineJobManager {
         String jobId = jobConfig.getJobId();
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
-        if (repositoryAPI.isJobConfigurationExisted(jobId)) {
+        if 
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
             log.warn("jobId already exists in registry center, ignore, job id 
is `{}`", jobId);
             return Optional.of(jobId);
         }
-        repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
-        repositoryAPI.persistJobConfiguration(jobId, 
jobConfig.convertToJobConfigurationPOJO());
+        repositoryAPI.getJobGovernanceRepository().create(jobId, 
jobAPI.getJobClass());
+        repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobId, 
jobConfig.convertToJobConfigurationPOJO());
         return Optional.of(jobId);
     }
     
@@ -119,7 +119,7 @@ public final class PipelineJobManager {
     }
     
     private void startNextDisabledJob(final String jobId, final String 
toBeStartDisabledNextJobType) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
                 new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStartDisabledNextJobType)).startDisabledJob(optional);
                 // CHECKSTYLE:OFF
@@ -141,7 +141,7 @@ public final class PipelineJobManager {
     }
     
     private void stopPreviousJob(final String jobId, final String 
toBeStoppedPreviousJobType) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
                 new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStoppedPreviousJobType)).stop(optional);
                 // CHECKSTYLE:OFF
@@ -176,7 +176,7 @@ public final class PipelineJobManager {
     public void drop(final String jobId) {
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
         
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), 
null);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getJobGovernanceRepository().delete(jobId);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index 00119f7d408..42e0035890d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
     @Override
     @SuppressWarnings("unchecked")
     public Map<String, DataSourcePoolProperties> load(final PipelineContextKey 
contextKey, final String jobType) {
-        String dataSourcesProps = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSources(jobType);
+        String dataSourcesProps = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
         if (Strings.isNullOrEmpty(dataSourcesProps)) {
             return Collections.emptyMap();
         }
@@ -55,6 +55,6 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             dataSourceMap.put(entry.getKey(), 
swapper.swapToMap(entry.getValue()));
         }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataDataSources(jobType,
 YamlEngine.marshal(dataSourceMap));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
 YamlEngine.marshal(dataSourceMap));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index 4df71589bf2..b20a5e22935 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     
     @Override
     public PipelineProcessConfiguration load(final PipelineContextKey 
contextKey, final String jobType) {
-        String yamlText = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfiguration(jobType);
+        String yamlText = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
         if (Strings.isNullOrEmpty(yamlText)) {
             return null;
         }
@@ -45,6 +45,6 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     @Override
     public void persist(final PipelineContextKey contextKey, final String 
jobType, final PipelineProcessConfiguration processConfig) {
         String yamlText = 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataProcessConfiguration(jobType,
 yamlText);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
 yamlText);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 0fdff431d4b..a46629de46d 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -121,13 +121,13 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
+        if 
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
 {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            repositoryAPI.persistJobRootInfo(jobConfig.getJobId(), 
getJobClass());
+            
repositoryAPI.getJobGovernanceRepository().create(jobConfig.getJobId(), 
getJobClass());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
             jobConfigPOJO.setDisabled(true);
-            repositoryAPI.persistJobConfiguration(jobConfig.getJobId(), 
jobConfigPOJO);
+            
repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
 jobConfigPOJO);
             if (!param.isFull()) {
                 initIncrementalPosition(jobConfig);
             }
@@ -177,7 +177,7 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
                 }
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 InventoryIncrementalJobItemProgress jobItemProgress = 
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
-                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
+                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
                         jobId, i, 
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index f9c1db64a65..e6b5c4a2ac9 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -80,7 +80,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     public String createJobAndStart(final CreateConsistencyCheckJobParameter 
param) {
         String parentJobId = param.getParentJobId();
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
             PipelineJobItemManager<ConsistencyCheckJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
             Optional<ConsistencyCheckJobItemProgress> progress = 
jobItemManager.getProgress(latestCheckJobId.get(), 0);
@@ -92,8 +92,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         verifyPipelineDatabaseType(param);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         String result = latestCheckJobId.map(s -> new 
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
-        repositoryAPI.persistLatestCheckJobId(parentJobId, result);
-        repositoryAPI.deleteCheckJobResult(parentJobId, result);
+        
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 result);
+        
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 result);
         new PipelineJobManager(this).drop(result);
         YamlConsistencyCheckJobConfiguration yamlConfig = new 
YamlConsistencyCheckJobConfiguration();
         yamlConfig.setJobId(result);
@@ -126,7 +126,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private String getLatestCheckJobId(final String parentJobId) {
-        Optional<String> result = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getLatestCheckJobId(parentJobId);
+        Optional<String> result = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(result.isPresent(), () -> new 
ConsistencyCheckJobNotFoundException(parentJobId));
         return result.get();
     }
@@ -150,16 +150,16 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         new PipelineJobManager(this).stop(latestCheckJobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
-        Collection<String> checkJobIds = 
repositoryAPI.listCheckJobIds(parentJobId);
+        Collection<String> checkJobIds = 
repositoryAPI.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
         Optional<Integer> previousSequence = 
ConsistencyCheckSequence.getPreviousSequence(
                 
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
 ConsistencyCheckJobId.parseSequence(latestCheckJobId));
         if (previousSequence.isPresent()) {
             String checkJobId = new ConsistencyCheckJobId(contextKey, 
parentJobId, previousSequence.get()).marshal();
-            repositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId);
+            
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 checkJobId);
         } else {
-            repositoryAPI.deleteLatestCheckJobId(parentJobId);
+            
repositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
         }
-        repositoryAPI.deleteCheckJobResult(parentJobId, latestCheckJobId);
+        
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 latestCheckJobId);
         new PipelineJobManager(this).drop(latestCheckJobId);
     }
     
@@ -171,7 +171,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
      */
     public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String 
parentJobId) {
         GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -182,7 +182,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
         ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
         if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
-            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceRepositoryAPI.getCheckJobResult(parentJobId, latestCheckJobId.get());
+            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 latestCheckJobId.get());
             
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","),
 checkJobResult));
         }
         if (Objects.equals(jobItemProgress.getIgnoredTableNames(), 
jobItemProgress.getTableNames())) {
@@ -212,7 +212,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     
     private ConsistencyCheckJobItemInfo getJobItemInfo(final String 
parentJobId) {
         GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -233,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
         result.setErrorMessage(new 
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
-        Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
+        Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 checkJobId);
         result.setCheckSuccess(checkJobResults.isEmpty() ? null : 
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
         
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
                 .map(Entry::getKey).collect(Collectors.joining(",")));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index e75f71f126d..67b7f80a7dc 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -113,7 +113,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 log.info("job {} with check algorithm '{}' data consistency 
checker result: {}, stopping: {}",
                         parentJobId, checkJobConfig.getAlgorithmTypeName(), 
checkResultMap, jobItemContext.isStopping());
                 if (!jobItemContext.isStopping()) {
-                    
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
+                    PipelineAPIFactory.getGovernanceRepositoryAPI(
+                            
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
                 }
             } finally {
                 
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index d80211588ae..d18c30e45c6 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -306,7 +306,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     private void dropCheckJobs(final String jobId) {
-        Collection<String> checkJobIds = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).listCheckJobIds(jobId);
+        Collection<String> checkJobIds = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
         if (checkJobIds.isEmpty()) {
             return;
         }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 8c46fd1761a..04cd7da0eb9 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -134,12 +134,12 @@ public final class MigrationJobPreparer {
         if (lockContext.tryLock(lockDefinition, 600000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
-                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
+                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
-                    
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
 new JobOffsetInfo(true));
+                    
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
 new JobOffsetInfo(true));
                 }
             } finally {
                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 7806cda61dc..0e3e0ff9312 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.Placeholde
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
@@ -31,8 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -54,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -85,28 +83,43 @@ class GovernanceRepositoryAPIImplTest {
     }
     
     @Test
-    void assertIsJobConfigurationExisted() {
-        
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
-        JobConfigurationPOJO value = new JobConfigurationPOJO();
-        value.setJobName("foo_job");
-        value.setShardingTotalCount(1);
-        getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", 
YamlEngine.marshal(value));
-        
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
+    void assertWatch() throws InterruptedException {
+        String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
+        getClusterPersistRepository().persist(key, "");
+        boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
+        assertTrue(awaitResult);
+        DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
+        assertNotNull(event);
+        assertThat(event.getType(), anyOf(is(Type.ADDED), is(Type.UPDATED)));
     }
     
     @Test
-    void assertPersistJobItemProgress() {
-        MigrationJobItemContext jobItemContext = mockJobItemContext();
-        
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue1");
-        
assertFalse(governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
-        
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue1");
-        Optional<String> actual = 
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
-        assertTrue(actual.isPresent());
-        assertThat(actual.get(), is("testValue1"));
-        
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue2");
-        actual = 
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
-        assertTrue(actual.isPresent());
-        assertThat(actual.get(), is("testValue2"));
+    void assertDeleteJob() {
+        ClusterPersistRepository clusterPersistRepository = 
getClusterPersistRepository();
+        clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
+        governanceRepositoryAPI.getJobGovernanceRepository().delete("1");
+        Optional<String> actual = new 
PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1", 
0);
+        assertFalse(actual.isPresent());
+    }
+    
+    @Test
+    void assertIsExistedJobConfiguration() {
+        ClusterPersistRepository clusterPersistRepository = 
getClusterPersistRepository();
+        
assertFalse(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+        clusterPersistRepository.persist("/pipeline/jobs/foo_job/config", 
"foo");
+        
assertTrue(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+    }
+    
+    @Test
+    void assertLatestCheckJobIdPersistenceDeletion() {
+        String parentJobId = "testParentJob";
+        String expectedCheckJobId = "testCheckJob";
+        
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 expectedCheckJobId);
+        Optional<String> actualCheckJobIdOpt = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        assertTrue(actualCheckJobIdOpt.isPresent());
+        assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId));
+        
governanceRepositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+        
assertFalse(governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
     }
     
     @Test
@@ -114,59 +127,43 @@ class GovernanceRepositoryAPIImplTest {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
         Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
         actual.put("test", new TableDataConsistencyCheckResult(true));
-        
governanceRepositoryAPI.persistCheckJobResult(jobItemContext.getJobId(), 
"j02123", actual);
-        Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceRepositoryAPI.getCheckJobResult(jobItemContext.getJobId(), "j02123");
+        
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
 "j02123", actual);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
 "j02123");
         assertThat(checkResult.size(), is(1));
         assertTrue(checkResult.get("test").isMatched());
     }
     
     @Test
-    void assertDeleteJob() {
-        
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
-        governanceRepositoryAPI.deleteJob("1");
-        Optional<String> actual = 
governanceRepositoryAPI.getJobItemProgress("1", 0);
-        assertFalse(actual.isPresent());
+    void assertPersistJobItemProcess() {
+        MigrationJobItemContext jobItemContext = mockJobItemContext();
+        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
+        
assertFalse(governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
+        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
+        Optional<String> actual = 
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), is("testValue1"));
+        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue2");
+        actual = 
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), is("testValue2"));
     }
     
     @Test
-    void assertWatch() throws InterruptedException {
-        String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
-        getClusterPersistRepository().persist(key, "");
-        boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
-        assertTrue(awaitResult);
-        DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
-        assertNotNull(event);
-        assertThat(event.getType(), anyOf(is(Type.ADDED), is(Type.UPDATED)));
+    void assertPersistJobOffset() {
+        
assertFalse(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+        
governanceRepositoryAPI.getJobOffsetGovernanceRepository().persist("1", new 
JobOffsetInfo(true));
+        
assertTrue(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
     }
     
     @Test
     void assertGetShardingItems() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
-        
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue");
-        List<Integer> shardingItems = 
governanceRepositoryAPI.getShardingItems(jobItemContext.getJobId());
+        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue");
+        List<Integer> shardingItems = 
governanceRepositoryAPI.getJobOffsetGovernanceRepository().getShardingItems(jobItemContext.getJobId());
         assertThat(shardingItems.size(), is(1));
         assertThat(shardingItems.get(0), is(jobItemContext.getShardingItem()));
     }
     
-    @Test
-    void assertPersistJobOffsetInfo() {
-        
assertFalse(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
-        governanceRepositoryAPI.persistJobOffsetInfo("1", new 
JobOffsetInfo(true));
-        
assertTrue(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
-    }
-    
-    @Test
-    void assertLatestCheckJobIdPersistenceDeletion() {
-        String parentJobId = "testParentJob";
-        String expectedCheckJobId = "testCheckJob";
-        governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, 
expectedCheckJobId);
-        Optional<String> actualCheckJobIdOpt = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
-        assertTrue(actualCheckJobIdOpt.isPresent(), "Expected a checkJobId to 
be present");
-        assertEquals(expectedCheckJobId, actualCheckJobIdOpt.get(), "The 
retrieved checkJobId does not match the expected one");
-        governanceRepositoryAPI.deleteLatestCheckJobId(parentJobId);
-        
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(),
 "Expected no checkJobId to be present after deletion");
-    }
-    
     private ClusterPersistRepository getClusterPersistRepository() {
         ContextManager contextManager = 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
         return (ClusterPersistRepository) 
contextManager.getMetaDataContexts().getPersistService().getRepository();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index a54697a5b66..075ac4950fe 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -51,7 +51,7 @@ class ConsistencyCheckJobTest {
         ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
         String checkJobId = pipelineJobId.marshal();
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId,
 0,
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
         ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
         ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildPipelineJobItemContext(
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index fe9ea2db83c..fd217a71ec8 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -91,20 +91,20 @@ class ConsistencyCheckJobAPITest {
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
             jobItemManager.persistProgress(checkJobItemContext);
             Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
-            repositoryAPI.persistCheckJobResult(parentJobId, checkJobId, 
dataConsistencyCheckResult);
-            Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
+            
repositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
+            Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence++));
         }
         expectedSequence = 2;
         for (int i = 0; i < 2; i++) {
             jobAPI.dropByParentJobId(parentJobId);
-            Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
+            Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence--));
         }
         jobAPI.dropByParentJobId(parentJobId);
-        Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         assertFalse(latestCheckJobId.isPresent());
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index ce9b79e1e49..7e0cdfdab72 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -303,7 +303,7 @@ class MigrationJobAPITest {
         YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new 
YamlInventoryIncrementalJobItemProgress();
         yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
         yamlJobItemProgress.setSourceDatabaseType("MySQL");
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         assertThat(jobItemInfos.size(), is(1));
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -320,7 +320,7 @@ class MigrationJobAPITest {
         
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
         yamlJobItemProgress.setProcessedRecordsCount(100);
         yamlJobItemProgress.setInventoryRecordsCount(50);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
         assertThat(jobItemInfo.getJobItemProgress().getStatus(), 
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 375062ab197..d23437a551a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -63,7 +63,7 @@ class MigrationDataConsistencyCheckerTest {
         jobConfigurationPOJO.setShardingTotalCount(1);
         GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
         
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
-        governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 
0, "");
+        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
 0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
         String checkKey = "t_order";

Reply via email to