This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 56ebfb13634 Refactor PipelineGovernanceFacade (#29142)
56ebfb13634 is described below

commit 56ebfb136342f05230fce2c2f28fac9ddc946e00
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 19:46:37 2023 +0800

    Refactor PipelineGovernanceFacade (#29142)
---
 .../repository/PipelineGovernanceFacade.java       | 30 +++++-----------
 ...ineJobItemErrorMessageGovernanceRepository.java |  2 +-
 .../PipelineJobItemFacade.java}                    | 34 ++++++------------
 ...PipelineJobItemProcessGovernanceRepository.java |  2 +-
 .../PipelineJobCheckGovernanceRepository.java      |  2 +-
 ...pelineJobConfigurationGovernanceRepository.java |  2 +-
 .../PipelineJobFacade.java}                        | 40 +++++++++-------------
 .../{ => job}/PipelineJobGovernanceRepository.java |  2 +-
 .../PipelineJobOffsetGovernanceRepository.java     |  2 +-
 ...lineMetaDataDataSourceGovernanceRepository.java |  2 +-
 .../PipelineMetaDataFacade.java}                   | 34 ++++++------------
 ...taProcessConfigurationGovernanceRepository.java |  2 +-
 .../service/PipelineJobIteErrorMessageManager.java |  6 ++--
 .../core/job/service/PipelineJobItemManager.java   |  8 ++---
 .../core/job/service/PipelineJobManager.java       | 12 +++----
 .../metadata/PipelineDataSourcePersistService.java |  4 +--
 ...PipelineProcessConfigurationPersistService.java |  4 +--
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  8 ++---
 .../api/impl/ConsistencyCheckJobAPI.java           | 24 ++++++-------
 .../task/ConsistencyCheckTasksRunner.java          |  2 +-
 .../migration/api/impl/MigrationJobAPI.java        |  2 +-
 .../migration/prepare/MigrationJobPreparer.java    |  4 +--
 .../job/service/PipelineGovernanceFacadeTest.java  | 38 ++++++++++----------
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 +-
 .../api/impl/ConsistencyCheckJobAPITest.java       |  8 ++---
 .../migration/api/impl/MigrationJobAPITest.java    |  4 +--
 .../MigrationDataConsistencyCheckerTest.java       |  2 +-
 27 files changed, 117 insertions(+), 165 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
index a2b33c35d69..a96baaf20a2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
@@ -20,6 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
 import lombok.AccessLevel;
 import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemFacade;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job.PipelineJobFacade;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata.PipelineMetaDataFacade;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
@@ -32,32 +35,17 @@ public final class PipelineGovernanceFacade {
     @Getter(AccessLevel.NONE)
     private final ClusterPersistRepository repository;
     
-    private final PipelineJobConfigurationGovernanceRepository 
jobConfigurationGovernanceRepository;
+    private final PipelineJobFacade jobFacade;
     
-    private final PipelineJobOffsetGovernanceRepository 
jobOffsetGovernanceRepository;
+    private final PipelineJobItemFacade jobItemFacade;
     
-    private final PipelineJobItemProcessGovernanceRepository 
jobItemProcessGovernanceRepository;
-    
-    private final PipelineJobItemErrorMessageGovernanceRepository 
jobItemErrorMessageGovernanceRepository;
-    
-    private final PipelineJobCheckGovernanceRepository 
jobCheckGovernanceRepository;
-    
-    private final PipelineJobGovernanceRepository jobGovernanceRepository;
-    
-    private final PipelineMetaDataDataSourceGovernanceRepository 
metaDataDataSourceGovernanceRepository;
-    
-    private final PipelineMetaDataProcessConfigurationGovernanceRepository 
metaDataProcessConfigurationGovernanceRepository;
+    private final PipelineMetaDataFacade metaDataFacade;
     
     public PipelineGovernanceFacade(final ClusterPersistRepository repository) 
{
         this.repository = repository;
-        jobConfigurationGovernanceRepository = new 
PipelineJobConfigurationGovernanceRepository(repository);
-        jobOffsetGovernanceRepository = new 
PipelineJobOffsetGovernanceRepository(repository);
-        jobItemProcessGovernanceRepository = new 
PipelineJobItemProcessGovernanceRepository(repository);
-        jobItemErrorMessageGovernanceRepository = new 
PipelineJobItemErrorMessageGovernanceRepository(repository);
-        jobCheckGovernanceRepository = new 
PipelineJobCheckGovernanceRepository(repository);
-        jobGovernanceRepository = new 
PipelineJobGovernanceRepository(repository);
-        metaDataDataSourceGovernanceRepository = new 
PipelineMetaDataDataSourceGovernanceRepository(repository);
-        metaDataProcessConfigurationGovernanceRepository = new 
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
+        jobFacade = new PipelineJobFacade(repository);
+        jobItemFacade = new PipelineJobItemFacade(repository);
+        metaDataFacade = new PipelineMetaDataFacade(repository);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
index eee1bd77b57..213302fa88f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
 
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java
similarity index 53%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java
index a403f0f7b0e..0bf38b96a22 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java
@@ -15,37 +15,23 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
- * Pipeline job governance repository.
+ * Pipeline job item facade.
  */
-@RequiredArgsConstructor
-public final class PipelineJobGovernanceRepository {
+@Getter
+public final class PipelineJobItemFacade {
     
-    private final ClusterPersistRepository repository;
+    private final PipelineJobItemProcessGovernanceRepository process;
     
-    /**
-     * Create job.
-     *
-     * @param jobId job ID
-     * @param jobClass job class
-     */
-    public void create(final String jobId, final Class<? extends PipelineJob> 
jobClass) {
-        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobClass.getName());
-    }
+    private final PipelineJobItemErrorMessageGovernanceRepository errorMessage;
     
-    /**
-     * Delete job.
-     *
-     * @param jobId job id
-     */
-    public void delete(final String jobId) {
-        repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+    public PipelineJobItemFacade(final ClusterPersistRepository repository) {
+        process = new PipelineJobItemProcessGovernanceRepository(repository);
+        errorMessage = new 
PipelineJobItemErrorMessageGovernanceRepository(repository);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java
similarity index 99%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java
index 7ea4887fdb8..6beb8cfc8db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
 
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java
similarity index 99%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java
index e5c44dfdd3c..15ae3ea230b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
 
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java
index 9b6d13bf983..1c4a1af066b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
 
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java
similarity index 53%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java
index a403f0f7b0e..9e101ece185 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java
@@ -15,37 +15,29 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
- * Pipeline job governance repository.
+ * Pipeline job facade.
  */
-@RequiredArgsConstructor
-public final class PipelineJobGovernanceRepository {
+@Getter
+public final class PipelineJobFacade {
     
-    private final ClusterPersistRepository repository;
+    private final PipelineJobGovernanceRepository job;
     
-    /**
-     * Create job.
-     *
-     * @param jobId job ID
-     * @param jobClass job class
-     */
-    public void create(final String jobId, final Class<? extends PipelineJob> 
jobClass) {
-        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobClass.getName());
-    }
+    private final PipelineJobConfigurationGovernanceRepository configuration;
+    
+    private final PipelineJobOffsetGovernanceRepository offset;
+    
+    private final PipelineJobCheckGovernanceRepository check;
     
-    /**
-     * Delete job.
-     *
-     * @param jobId job id
-     */
-    public void delete(final String jobId) {
-        repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+    public PipelineJobFacade(final ClusterPersistRepository repository) {
+        job = new PipelineJobGovernanceRepository(repository);
+        configuration = new 
PipelineJobConfigurationGovernanceRepository(repository);
+        offset = new PipelineJobOffsetGovernanceRepository(repository);
+        check = new PipelineJobCheckGovernanceRepository(repository);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java
similarity index 98%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java
index a403f0f7b0e..606020cf405 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java
similarity index 99%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java
index c007b67574f..c75ace6e8ca 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
 
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java
index 870cc0c20dc..70a973c8be5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;
 
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java
similarity index 53%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java
index a403f0f7b0e..4d3a6a430a9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java
@@ -15,37 +15,23 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
- * Pipeline job governance repository.
+ * Pipeline job item facade.
  */
-@RequiredArgsConstructor
-public final class PipelineJobGovernanceRepository {
+@Getter
+public final class PipelineMetaDataFacade {
     
-    private final ClusterPersistRepository repository;
+    private final PipelineMetaDataDataSourceGovernanceRepository dataSource;
     
-    /**
-     * Create job.
-     *
-     * @param jobId job ID
-     * @param jobClass job class
-     */
-    public void create(final String jobId, final Class<? extends PipelineJob> 
jobClass) {
-        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobClass.getName());
-    }
+    private final PipelineMetaDataProcessConfigurationGovernanceRepository 
processConfiguration;
     
-    /**
-     * Delete job.
-     *
-     * @param jobId job id
-     */
-    public void delete(final String jobId) {
-        repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+    public PipelineMetaDataFacade(final ClusterPersistRepository repository) {
+        dataSource = new 
PipelineMetaDataDataSourceGovernanceRepository(repository);
+        processConfiguration = new 
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java
index e6f3e1ffb58..6f2802e735b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;
 
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index b63e9b5b269..1066c68d9f3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -46,7 +46,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @return map, key is sharding item, value is error message
      */
     public String getErrorMessage() {
-        return 
Optional.ofNullable(governanceFacade.getJobItemErrorMessageGovernanceRepository().load(jobId,
 shardingItem)).orElse("");
+        return 
Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId,
 shardingItem)).orElse("");
     }
     
     /**
@@ -55,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @param error error
      */
     public void updateErrorMessage(final Object error) {
-        
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, 
shardingItem, null == error ? "" : buildErrorMessage(error));
+        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, null == error ? "" : buildErrorMessage(error));
     }
     
     private String buildErrorMessage(final Object error) {
@@ -66,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
      * Clean job item error message.
      */
     public void cleanErrorMessage() {
-        
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, 
shardingItem, "");
+        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, "");
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index d2463bc7f5f..c9643d284b1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -55,7 +55,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
         }
         jobItemProgress.get().setStatus(status);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId))
-                .getJobItemProcessGovernanceRepository().update(jobId, 
shardingItem, 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+                .getJobItemFacade().getProcess().update(jobId, shardingItem, 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
     /**
@@ -66,7 +66,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      * @return job item progress
      */
     public Optional<T> getProgress(final String jobId, final int shardingItem) 
{
-        return 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
 shardingItem)
+        return 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().load(jobId,
 shardingItem)
                 .map(optional -> 
swapper.swapToObject(YamlEngine.unmarshal(optional, 
swapper.getYamlProgressClass(), true)));
     }
     
@@ -77,7 +77,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      */
     public void persistProgress(final PipelineJobItemContext jobItemContext) {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+                
.getJobItemFacade().getProcess().persist(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
     }
     
     /**
@@ -87,7 +87,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      */
     public void updateProgress(final PipelineJobItemContext jobItemContext) {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+                
.getJobItemFacade().getProcess().update(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
     }
     
     @SuppressWarnings("unchecked")
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 59d205b8528..1a11a902d52 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -75,12 +75,12 @@ public final class PipelineJobManager {
         String jobId = jobConfig.getJobId();
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
-        if 
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
+        if 
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
             log.warn("jobId already exists in registry center, ignore, job id 
is `{}`", jobId);
             return Optional.of(jobId);
         }
-        governanceFacade.getJobGovernanceRepository().create(jobId, 
jobAPI.getJobClass());
-        
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobId, 
jobConfig.convertToJobConfigurationPOJO());
+        governanceFacade.getJobFacade().getJob().create(jobId, 
jobAPI.getJobClass());
+        governanceFacade.getJobFacade().getConfiguration().persist(jobId, 
jobConfig.convertToJobConfigurationPOJO());
         return Optional.of(jobId);
     }
     
@@ -119,7 +119,7 @@ public final class PipelineJobManager {
     }
     
     private void startNextDisabledJob(final String jobId, final String 
toBeStartDisabledNextJobType) {
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
                 new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStartDisabledNextJobType)).startDisabledJob(optional);
                 // CHECKSTYLE:OFF
@@ -141,7 +141,7 @@ public final class PipelineJobManager {
     }
     
     private void stopPreviousJob(final String jobId, final String 
toBeStoppedPreviousJobType) {
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
                 new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStoppedPreviousJobType)).stop(optional);
                 // CHECKSTYLE:OFF
@@ -176,7 +176,7 @@ public final class PipelineJobManager {
     public void drop(final String jobId) {
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
         
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), 
null);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobGovernanceRepository().delete(jobId);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index a951f9fee3c..924ae8859a9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
     @Override
     @SuppressWarnings("unchecked")
     public Map<String, DataSourcePoolProperties> load(final PipelineContextKey 
contextKey, final String jobType) {
-        String dataSourcesProps = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
+        String dataSourcesProps = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().load(jobType);
         if (Strings.isNullOrEmpty(dataSourcesProps)) {
             return Collections.emptyMap();
         }
@@ -55,6 +55,6 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             dataSourceMap.put(entry.getKey(), 
swapper.swapToMap(entry.getValue()));
         }
-        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
 YamlEngine.marshal(dataSourceMap));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().persist(jobType,
 YamlEngine.marshal(dataSourceMap));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index 5b8c1bce798..97954a5cc96 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     
     @Override
     public PipelineProcessConfiguration load(final PipelineContextKey 
contextKey, final String jobType) {
-        String yamlText = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
+        String yamlText = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getProcessConfiguration().load(jobType);
         if (Strings.isNullOrEmpty(yamlText)) {
             return null;
         }
@@ -45,6 +45,6 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     @Override
     public void persist(final PipelineContextKey contextKey, final String 
jobType, final PipelineProcessConfiguration processConfig) {
         String yamlText = 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
-        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
 yamlText);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getProcessConfiguration().persist(jobType,
 yamlText);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 0b4a06dd368..614d06c9f40 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -121,13 +121,13 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        if 
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
 {
+        if 
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
 {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            
governanceFacade.getJobGovernanceRepository().create(jobConfig.getJobId(), 
getJobClass());
+            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
getJobClass());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
             jobConfigPOJO.setDisabled(true);
-            
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
 jobConfigPOJO);
+            
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
             if (!param.isFull()) {
                 initIncrementalPosition(jobConfig);
             }
@@ -177,7 +177,7 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
                 }
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 InventoryIncrementalJobItemProgress jobItemProgress = 
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
-                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
+                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
                         jobId, i, 
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index d9a48111722..a9a846bed49 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -80,7 +80,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     public String createJobAndStart(final CreateConsistencyCheckJobParameter 
param) {
         String parentJobId = param.getParentJobId();
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
             PipelineJobItemManager<ConsistencyCheckJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
             Optional<ConsistencyCheckJobItemProgress> progress = 
jobItemManager.getProgress(latestCheckJobId.get(), 0);
@@ -92,8 +92,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         verifyPipelineDatabaseType(param);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         String result = latestCheckJobId.map(s -> new 
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
-        
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 result);
-        
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 result);
+        
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, 
result);
+        
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, 
result);
         new PipelineJobManager(this).drop(result);
         YamlConsistencyCheckJobConfiguration yamlConfig = new 
YamlConsistencyCheckJobConfiguration();
         yamlConfig.setJobId(result);
@@ -126,7 +126,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private String getLatestCheckJobId(final String parentJobId) {
-        Optional<String> result = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> result = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(result.isPresent(), () -> new 
ConsistencyCheckJobNotFoundException(parentJobId));
         return result.get();
     }
@@ -150,16 +150,16 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         new PipelineJobManager(this).stop(latestCheckJobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
-        Collection<String> checkJobIds = 
governanceFacade.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
+        Collection<String> checkJobIds = 
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
         Optional<Integer> previousSequence = 
ConsistencyCheckSequence.getPreviousSequence(
                 
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
 ConsistencyCheckJobId.parseSequence(latestCheckJobId));
         if (previousSequence.isPresent()) {
             String checkJobId = new ConsistencyCheckJobId(contextKey, 
parentJobId, previousSequence.get()).marshal();
-            
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 checkJobId);
+            
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, 
checkJobId);
         } else {
-            
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+            
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
         }
-        
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 latestCheckJobId);
+        
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, 
latestCheckJobId);
         new PipelineJobManager(this).drop(latestCheckJobId);
     }
     
@@ -171,7 +171,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
      */
     public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String 
parentJobId) {
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -182,7 +182,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
         ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
         if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
-            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 latestCheckJobId.get());
+            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, 
latestCheckJobId.get());
             
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","),
 checkJobResult));
         }
         if (Objects.equals(jobItemProgress.getIgnoredTableNames(), 
jobItemProgress.getTableNames())) {
@@ -212,7 +212,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     
     private ConsistencyCheckJobItemInfo getJobItemInfo(final String 
parentJobId) {
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -233,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
         result.setErrorMessage(new 
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
-        Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 checkJobId);
+        Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, 
checkJobId);
         result.setCheckSuccess(checkJobResults.isEmpty() ? null : 
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
         
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
                 .map(Entry::getKey).collect(Collectors.joining(",")));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 0036cb46113..f705d6b01ab 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -114,7 +114,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                         parentJobId, checkJobConfig.getAlgorithmTypeName(), 
checkResultMap, jobItemContext.isStopping());
                 if (!jobItemContext.isStopping()) {
                     PipelineAPIFactory.getPipelineGovernanceFacade(
-                            
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
+                            
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
                 }
             } finally {
                 
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 66d16150c09..e7e4f0bf06e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -306,7 +306,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     private void dropCheckJobs(final String jobId) {
-        Collection<String> checkJobIds = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
+        Collection<String> checkJobIds = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
         if (checkJobIds.isEmpty()) {
             return;
         }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 993009506e7..26918b29f9b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -134,12 +134,12 @@ public final class MigrationJobPreparer {
         if (lockContext.tryLock(lockDefinition, 600000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
-                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
+                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
-                    
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
 new JobOffsetInfo(true));
+                    
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId,
 new JobOffsetInfo(true));
                 }
             } finally {
                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
index 75c9516d8fd..424f48d82c9 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.Placeholde
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
@@ -96,7 +96,7 @@ class PipelineGovernanceFacadeTest {
     void assertDeleteJob() {
         ClusterPersistRepository clusterPersistRepository = 
getClusterPersistRepository();
         clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
-        governanceFacade.getJobGovernanceRepository().delete("1");
+        governanceFacade.getJobFacade().getJob().delete("1");
         Optional<String> actual = new 
PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1", 
0);
         assertFalse(actual.isPresent());
     }
@@ -104,21 +104,21 @@ class PipelineGovernanceFacadeTest {
     @Test
     void assertIsExistedJobConfiguration() {
         ClusterPersistRepository clusterPersistRepository = 
getClusterPersistRepository();
-        
assertFalse(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+        
assertFalse(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job"));
         clusterPersistRepository.persist("/pipeline/jobs/foo_job/config", 
"foo");
-        
assertTrue(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+        
assertTrue(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job"));
     }
     
     @Test
     void assertLatestCheckJobIdPersistenceDeletion() {
         String parentJobId = "testParentJob";
         String expectedCheckJobId = "testCheckJob";
-        
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 expectedCheckJobId);
-        Optional<String> actualCheckJobIdOpt = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, 
expectedCheckJobId);
+        Optional<String> actualCheckJobIdOpt = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         assertTrue(actualCheckJobIdOpt.isPresent());
         assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId));
-        
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
-        
assertFalse(governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
+        
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
+        
assertFalse(governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId).isPresent());
     }
     
     @Test
@@ -126,8 +126,8 @@ class PipelineGovernanceFacadeTest {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
         Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
         actual.put("test", new TableDataConsistencyCheckResult(true));
-        
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
 "j02123", actual);
-        Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
 "j02123");
+        
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(jobItemContext.getJobId(),
 "j02123", actual);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceFacade.getJobFacade().getCheck().getCheckJobResult(jobItemContext.getJobId(),
 "j02123");
         assertThat(checkResult.size(), is(1));
         assertTrue(checkResult.get("test").isMatched());
     }
@@ -135,23 +135,23 @@ class PipelineGovernanceFacadeTest {
     @Test
     void assertPersistJobItemProcess() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
-        
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
-        
assertFalse(governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
-        
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
-        Optional<String> actual = 
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
+        
governanceFacade.getJobItemFacade().getProcess().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
+        
assertFalse(governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
+        
governanceFacade.getJobItemFacade().getProcess().persist(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
+        Optional<String> actual = 
governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get(), is("testValue1"));
-        
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue2");
-        actual = 
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
+        
governanceFacade.getJobItemFacade().getProcess().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue2");
+        actual = 
governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get(), is("testValue2"));
     }
     
     @Test
     void assertPersistJobOffset() {
-        
assertFalse(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
-        governanceFacade.getJobOffsetGovernanceRepository().persist("1", new 
JobOffsetInfo(true));
-        
assertTrue(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+        
assertFalse(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated());
+        governanceFacade.getJobFacade().getOffset().persist("1", new 
JobOffsetInfo(true));
+        
assertTrue(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated());
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 437293bca13..3802e92bcd3 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -51,7 +51,7 @@ class ConsistencyCheckJobTest {
         ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
         String checkJobId = pipelineJobId.marshal();
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
 0,
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
         ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
         ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildPipelineJobItemContext(
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 309b944d451..b0ff58bbfaf 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -91,20 +91,20 @@ class ConsistencyCheckJobAPITest {
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
             jobItemManager.persistProgress(checkJobItemContext);
             Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
-            
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
-            Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+            
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId, 
checkJobId, dataConsistencyCheckResult);
+            Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence++));
         }
         expectedSequence = 2;
         for (int i = 0; i < 2; i++) {
             jobAPI.dropByParentJobId(parentJobId);
-            Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+            Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence--));
         }
         jobAPI.dropByParentJobId(parentJobId);
-        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         assertFalse(latestCheckJobId.isPresent());
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 9c3418ef846..be69e9dd0eb 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -303,7 +303,7 @@ class MigrationJobAPITest {
         YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new 
YamlInventoryIncrementalJobItemProgress();
         yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
         yamlJobItemProgress.setSourceDatabaseType("MySQL");
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         assertThat(jobItemInfos.size(), is(1));
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -320,7 +320,7 @@ class MigrationJobAPITest {
         
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
         yamlJobItemProgress.setProcessedRecordsCount(100);
         yamlJobItemProgress.setInventoryRecordsCount(50);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
         assertThat(jobItemInfo.getJobItemProgress().getStatus(), 
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3389cb2555c..64d9d193213 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -63,7 +63,7 @@ class MigrationDataConsistencyCheckerTest {
         jobConfigurationPOJO.setShardingTotalCount(1);
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
         
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
-        
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
 0, "");
+        
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 
0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
         String checkKey = "t_order";

Reply via email to