This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 459c7050a2d Refactor PipelineGovernanceFacade (#29140)
459c7050a2d is described below

commit 459c7050a2da46a354151062f8c26ab0012a8b42
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 18:49:22 2023 +0800

    Refactor PipelineGovernanceFacade (#29140)
---
 .../metadata/node/PipelineMetaDataNodeWatcher.java |  2 +-
 .../repository/GovernanceRepositoryAPI.java        | 89 ----------------------
 ...yAPIImpl.java => PipelineGovernanceFacade.java} | 14 +++-
 .../core/job/service/PipelineAPIFactory.java       | 17 ++---
 .../service/PipelineJobIteErrorMessageManager.java | 12 +--
 .../core/job/service/PipelineJobItemManager.java   |  8 +-
 .../core/job/service/PipelineJobManager.java       | 16 ++--
 .../metadata/PipelineDataSourcePersistService.java |  4 +-
 ...PipelineProcessConfigurationPersistService.java |  4 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 12 +--
 .../api/impl/ConsistencyCheckJobAPI.java           | 34 ++++-----
 .../task/ConsistencyCheckTasksRunner.java          |  2 +-
 .../migration/api/impl/MigrationJobAPI.java        |  2 +-
 .../migration/prepare/MigrationJobPreparer.java    |  4 +-
 ...Test.java => PipelineGovernanceFacadeTest.java} | 46 +++++------
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 +-
 .../api/impl/ConsistencyCheckJobAPITest.java       | 12 +--
 .../migration/api/impl/MigrationJobAPITest.java    |  4 +-
 .../MigrationDataConsistencyCheckerTest.java       |  6 +-
 19 files changed, 103 insertions(+), 187 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
index 42003ec1d66..df4f34b2506 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
     private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
         
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
                 
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern,
 each -> each)));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).watchPipeLineRootPath(this::dispatchEvent);
     }
     
     private void dispatchEvent(final DataChangedEvent event) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
deleted file mode 100644
index 930d24226b0..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
-
-import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
-
-/**
- * Governance repository API.
- */
-public interface GovernanceRepositoryAPI {
-    
-    /**
-     * Get job configuration governance repository.
-     * 
-     * @return job configuration governance repository
-     */
-    PipelineJobConfigurationGovernanceRepository 
getJobConfigurationGovernanceRepository();
-    
-    /**
-     * Get job offset governance repository.
-     * 
-     * @return job offset governance repository
-     */
-    PipelineJobOffsetGovernanceRepository getJobOffsetGovernanceRepository();
-    
-    /**
-     * Get job item process governance repository.
-     *
-     * @return job item process governance repository
-     */
-    PipelineJobItemProcessGovernanceRepository 
getJobItemProcessGovernanceRepository();
-    
-    /**
-     * Get job item error message governance repository.
-     * 
-     * @return job item error message governance repository
-     */
-    PipelineJobItemErrorMessageGovernanceRepository 
getJobItemErrorMessageGovernanceRepository();
-    
-    /**
-     * Get job check governance repository.
-     * 
-     * @return job check governance repository
-     */
-    PipelineJobCheckGovernanceRepository getJobCheckGovernanceRepository();
-    
-    /**
-     * Get job governance repository.
-     * 
-     * @return job governance repository
-     */
-    PipelineJobGovernanceRepository getJobGovernanceRepository();
-    
-    /**
-     * Get meta data data source governance repository.
-     *
-     * @return meta data data source governance repository
-     */
-    PipelineMetaDataDataSourceGovernanceRepository 
getMetaDataDataSourceGovernanceRepository();
-    
-    /**
-     * Get meta data process configuration governance repository.
-     * 
-     * @return meta data process configuration governance repository
-     */
-    PipelineMetaDataProcessConfigurationGovernanceRepository 
getMetaDataProcessConfigurationGovernanceRepository();
-    
-    /**
-     * Watch pipeLine root path.
-     *
-     * @param listener data changed event listener
-     */
-    void watchPipeLineRootPath(DataChangedEventListener listener);
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
similarity index 90%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
index 2605c893bfb..a2b33c35d69 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
@@ -17,17 +17,19 @@
 
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 /**
- * Governance repository API impl.
+ * Pipeline governance facade.
  */
 @Getter
-public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAPI {
+public final class PipelineGovernanceFacade {
     
+    @Getter(AccessLevel.NONE)
     private final ClusterPersistRepository repository;
     
     private final PipelineJobConfigurationGovernanceRepository 
jobConfigurationGovernanceRepository;
@@ -46,7 +48,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     private final PipelineMetaDataProcessConfigurationGovernanceRepository 
metaDataProcessConfigurationGovernanceRepository;
     
-    public GovernanceRepositoryAPIImpl(final ClusterPersistRepository 
repository) {
+    public PipelineGovernanceFacade(final ClusterPersistRepository repository) 
{
         this.repository = repository;
         jobConfigurationGovernanceRepository = new 
PipelineJobConfigurationGovernanceRepository(repository);
         jobOffsetGovernanceRepository = new 
PipelineJobOffsetGovernanceRepository(repository);
@@ -58,7 +60,11 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         metaDataProcessConfigurationGovernanceRepository = new 
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
     }
     
-    @Override
+    /**
+     * Watch pipeLine root path.
+     *
+     * @param listener data changed event listener
+     */
     public void watchPipeLineRootPath(final DataChangedEventListener listener) 
{
         repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
index e5b4fbc540c..99108b15730 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
@@ -27,8 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.elasticjob.CoordinatorRegistryCenterInitializer;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
@@ -49,22 +48,22 @@ import java.util.concurrent.ConcurrentHashMap;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineAPIFactory {
     
-    private static final Map<PipelineContextKey, 
LazyInitializer<GovernanceRepositoryAPI>> GOVERNANCE_REPOSITORY_API_MAP = new 
ConcurrentHashMap<>();
+    private static final Map<PipelineContextKey, 
LazyInitializer<PipelineGovernanceFacade>> GOVERNANCE_FACADE_MAP = new 
ConcurrentHashMap<>();
     
     /**
-     * Get governance repository API.
+     * Get pipeline governance facade.
      *
      * @param contextKey context key
-     * @return governance repository API
+     * @return pipeline governance facade
      */
     @SneakyThrows(ConcurrentException.class)
-    public static GovernanceRepositoryAPI getGovernanceRepositoryAPI(final 
PipelineContextKey contextKey) {
-        return GOVERNANCE_REPOSITORY_API_MAP.computeIfAbsent(contextKey, key 
-> new LazyInitializer<GovernanceRepositoryAPI>() {
+    public static PipelineGovernanceFacade getPipelineGovernanceFacade(final 
PipelineContextKey contextKey) {
+        return GOVERNANCE_FACADE_MAP.computeIfAbsent(contextKey, key -> new 
LazyInitializer<PipelineGovernanceFacade>() {
             
             @Override
-            protected GovernanceRepositoryAPI initialize() {
+            protected PipelineGovernanceFacade initialize() {
                 ContextManager contextManager = 
PipelineContextManager.getContext(contextKey).getContextManager();
-                return new 
GovernanceRepositoryAPIImpl((ClusterPersistRepository) 
contextManager.getMetaDataContexts().getPersistService().getRepository());
+                return new PipelineGovernanceFacade((ClusterPersistRepository) 
contextManager.getMetaDataContexts().getPersistService().getRepository());
             }
         }).get();
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index 879b241e58e..b63e9b5b269 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 
 import java.util.Optional;
@@ -32,12 +32,12 @@ public final class PipelineJobIteErrorMessageManager {
     
     private final int shardingItem;
     
-    private final GovernanceRepositoryAPI governanceRepositoryAPI;
+    private final PipelineGovernanceFacade governanceFacade;
     
     public PipelineJobIteErrorMessageManager(final String jobId, final int 
shardingItem) {
         this.jobId = jobId;
         this.shardingItem = shardingItem;
-        governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+        governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
     }
     
     /**
@@ -46,7 +46,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @return map, key is sharding item, value is error message
      */
     public String getErrorMessage() {
-        return 
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().load(jobId,
 shardingItem)).orElse("");
+        return 
Optional.ofNullable(governanceFacade.getJobItemErrorMessageGovernanceRepository().load(jobId,
 shardingItem)).orElse("");
     }
     
     /**
@@ -55,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @param error error
      */
     public void updateErrorMessage(final Object error) {
-        
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
 shardingItem, null == error ? "" : buildErrorMessage(error));
+        
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, 
shardingItem, null == error ? "" : buildErrorMessage(error));
     }
     
     private String buildErrorMessage(final Object error) {
@@ -66,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
      * Clean job item error message.
      */
     public void cleanErrorMessage() {
-        
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
 shardingItem, "");
+        
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, 
shardingItem, "");
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 78a880c68c9..d2463bc7f5f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -54,7 +54,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
             return;
         }
         jobItemProgress.get().setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId))
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId))
                 .getJobItemProcessGovernanceRepository().update(jobId, 
shardingItem, 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
@@ -66,7 +66,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      * @return job item progress
      */
     public Optional<T> getProgress(final String jobId, final int shardingItem) 
{
-        return 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
 shardingItem)
+        return 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
 shardingItem)
                 .map(optional -> 
swapper.swapToObject(YamlEngine.unmarshal(optional, 
swapper.getYamlProgressClass(), true)));
     }
     
@@ -76,7 +76,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      * @param jobItemContext job item context
      */
     public void persistProgress(final PipelineJobItemContext jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
                 
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
     }
     
@@ -86,7 +86,7 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
      * @param jobItemContext job item context
      */
     public void updateProgress(final PipelineJobItemContext jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
                 
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index a3a89ccf6e6..59d205b8528 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
@@ -74,13 +74,13 @@ public final class PipelineJobManager {
     public Optional<String> start(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
-        if 
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
+        if 
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
             log.warn("jobId already exists in registry center, ignore, job id 
is `{}`", jobId);
             return Optional.of(jobId);
         }
-        repositoryAPI.getJobGovernanceRepository().create(jobId, 
jobAPI.getJobClass());
-        repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobId, 
jobConfig.convertToJobConfigurationPOJO());
+        governanceFacade.getJobGovernanceRepository().create(jobId, 
jobAPI.getJobClass());
+        
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobId, 
jobConfig.convertToJobConfigurationPOJO());
         return Optional.of(jobId);
     }
     
@@ -119,7 +119,7 @@ public final class PipelineJobManager {
     }
     
     private void startNextDisabledJob(final String jobId, final String 
toBeStartDisabledNextJobType) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
                 new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStartDisabledNextJobType)).startDisabledJob(optional);
                 // CHECKSTYLE:OFF
@@ -141,7 +141,7 @@ public final class PipelineJobManager {
     }
     
     private void stopPreviousJob(final String jobId, final String 
toBeStoppedPreviousJobType) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
                 new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStoppedPreviousJobType)).stop(optional);
                 // CHECKSTYLE:OFF
@@ -176,7 +176,7 @@ public final class PipelineJobManager {
     public void drop(final String jobId) {
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
         
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), 
null);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getJobGovernanceRepository().delete(jobId);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobGovernanceRepository().delete(jobId);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index 42e0035890d..a951f9fee3c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
     @Override
     @SuppressWarnings("unchecked")
     public Map<String, DataSourcePoolProperties> load(final PipelineContextKey 
contextKey, final String jobType) {
-        String dataSourcesProps = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
+        String dataSourcesProps = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
         if (Strings.isNullOrEmpty(dataSourcesProps)) {
             return Collections.emptyMap();
         }
@@ -55,6 +55,6 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             dataSourceMap.put(entry.getKey(), 
swapper.swapToMap(entry.getValue()));
         }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
 YamlEngine.marshal(dataSourceMap));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
 YamlEngine.marshal(dataSourceMap));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index b20a5e22935..5b8c1bce798 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     
     @Override
     public PipelineProcessConfiguration load(final PipelineContextKey 
contextKey, final String jobType) {
-        String yamlText = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
+        String yamlText = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
         if (Strings.isNullOrEmpty(yamlText)) {
             return null;
         }
@@ -45,6 +45,6 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     @Override
     public void persist(final PipelineContextKey contextKey, final String 
jobType, final PipelineProcessConfiguration processConfig) {
         String yamlText = 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
 yamlText);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
 yamlText);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index a46629de46d..0b4a06dd368 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -51,7 +51,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrem
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
@@ -120,14 +120,14 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
         extendYamlJobConfiguration(contextKey, yamlJobConfig);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        if 
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
 {
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        if 
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
 {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            
repositoryAPI.getJobGovernanceRepository().create(jobConfig.getJobId(), 
getJobClass());
+            
governanceFacade.getJobGovernanceRepository().create(jobConfig.getJobId(), 
getJobClass());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
             jobConfigPOJO.setDisabled(true);
-            
repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
 jobConfigPOJO);
+            
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
 jobConfigPOJO);
             if (!param.isFull()) {
                 initIncrementalPosition(jobConfig);
             }
@@ -177,7 +177,7 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
                 }
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 InventoryIncrementalJobItemProgress jobItemProgress = 
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
-                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
+                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
                         jobId, i, 
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index e6b5c4a2ac9..d9a48111722 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -79,8 +79,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
      */
     public String createJobAndStart(final CreateConsistencyCheckJobParameter 
param) {
         String parentJobId = param.getParentJobId();
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
             PipelineJobItemManager<ConsistencyCheckJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
             Optional<ConsistencyCheckJobItemProgress> progress = 
jobItemManager.getProgress(latestCheckJobId.get(), 0);
@@ -92,8 +92,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         verifyPipelineDatabaseType(param);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         String result = latestCheckJobId.map(s -> new 
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
-        
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 result);
-        
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 result);
+        
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 result);
+        
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 result);
         new PipelineJobManager(this).drop(result);
         YamlConsistencyCheckJobConfiguration yamlConfig = new 
YamlConsistencyCheckJobConfiguration();
         yamlConfig.setJobId(result);
@@ -126,7 +126,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private String getLatestCheckJobId(final String parentJobId) {
-        Optional<String> result = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> result = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(result.isPresent(), () -> new 
ConsistencyCheckJobNotFoundException(parentJobId));
         return result.get();
     }
@@ -149,17 +149,17 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         String latestCheckJobId = getLatestCheckJobId(parentJobId);
         new PipelineJobManager(this).stop(latestCheckJobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
-        Collection<String> checkJobIds = 
repositoryAPI.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
+        Collection<String> checkJobIds = 
governanceFacade.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
         Optional<Integer> previousSequence = 
ConsistencyCheckSequence.getPreviousSequence(
                 
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
 ConsistencyCheckJobId.parseSequence(latestCheckJobId));
         if (previousSequence.isPresent()) {
             String checkJobId = new ConsistencyCheckJobId(contextKey, 
parentJobId, previousSequence.get()).marshal();
-            
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 checkJobId);
+            
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 checkJobId);
         } else {
-            
repositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+            
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
         }
-        
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 latestCheckJobId);
+        
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
 latestCheckJobId);
         new PipelineJobManager(this).drop(latestCheckJobId);
     }
     
@@ -170,8 +170,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
      * @return consistency job item infos
      */
     public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String 
parentJobId) {
-        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -182,7 +182,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
         ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
         if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
-            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 latestCheckJobId.get());
+            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 latestCheckJobId.get());
             
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","),
 checkJobResult));
         }
         if (Objects.equals(jobItemProgress.getIgnoredTableNames(), 
jobItemProgress.getTableNames())) {
@@ -211,8 +211,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private ConsistencyCheckJobItemInfo getJobItemInfo(final String 
parentJobId) {
-        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -233,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
         result.setErrorMessage(new 
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
-        Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 checkJobId);
+        Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
 checkJobId);
         result.setCheckSuccess(checkJobResults.isEmpty() ? null : 
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
         
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
                 .map(Entry::getKey).collect(Collectors.joining(",")));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 67b7f80a7dc..0036cb46113 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -113,7 +113,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 log.info("job {} with check algorithm '{}' data consistency 
checker result: {}, stopping: {}",
                         parentJobId, checkJobConfig.getAlgorithmTypeName(), 
checkResultMap, jobItemContext.isStopping());
                 if (!jobItemContext.isStopping()) {
-                    PipelineAPIFactory.getGovernanceRepositoryAPI(
+                    PipelineAPIFactory.getPipelineGovernanceFacade(
                             
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
                 }
             } finally {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index d18c30e45c6..66d16150c09 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -306,7 +306,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     private void dropCheckJobs(final String jobId) {
-        Collection<String> checkJobIds = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
+        Collection<String> checkJobIds = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
         if (checkJobIds.isEmpty()) {
             return;
         }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 04cd7da0eb9..993009506e7 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -134,12 +134,12 @@ public final class MigrationJobPreparer {
         if (lockContext.tryLock(lockDefinition, 600000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
-                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
+                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
-                    
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
 new JobOffsetInfo(true));
+                    
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
 new JobOffsetInfo(true));
                 }
             } finally {
                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
similarity index 74%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
index 0ff4eaa6273..75c9516d8fd 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
@@ -57,9 +57,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
-class GovernanceRepositoryAPIImplTest {
+class PipelineGovernanceFacadeTest {
     
-    private static GovernanceRepositoryAPI governanceRepositoryAPI;
+    private static PipelineGovernanceFacade governanceFacade;
     
     private static final AtomicReference<DataChangedEvent> 
EVENT_ATOMIC_REFERENCE = new AtomicReference<>();
     
@@ -68,12 +68,12 @@ class GovernanceRepositoryAPIImplTest {
     @BeforeAll
     static void beforeClass() {
         PipelineContextUtils.mockModeConfigAndContextManager();
-        governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+        governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
         watch();
     }
     
     private static void watch() {
-        governanceRepositoryAPI.watchPipeLineRootPath(event -> {
+        governanceFacade.watchPipeLineRootPath(event -> {
             if ((PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1").equals(event.getKey())) {
                 EVENT_ATOMIC_REFERENCE.set(event);
                 COUNT_DOWN_LATCH.countDown();
@@ -96,7 +96,7 @@ class GovernanceRepositoryAPIImplTest {
     void assertDeleteJob() {
         ClusterPersistRepository clusterPersistRepository = 
getClusterPersistRepository();
         clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
-        governanceRepositoryAPI.getJobGovernanceRepository().delete("1");
+        governanceFacade.getJobGovernanceRepository().delete("1");
         Optional<String> actual = new 
PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1", 
0);
         assertFalse(actual.isPresent());
     }
@@ -104,21 +104,21 @@ class GovernanceRepositoryAPIImplTest {
     @Test
     void assertIsExistedJobConfiguration() {
         ClusterPersistRepository clusterPersistRepository = 
getClusterPersistRepository();
-        
assertFalse(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+        
assertFalse(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
         clusterPersistRepository.persist("/pipeline/jobs/foo_job/config", 
"foo");
-        
assertTrue(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+        
assertTrue(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
     }
     
     @Test
     void assertLatestCheckJobIdPersistenceDeletion() {
         String parentJobId = "testParentJob";
         String expectedCheckJobId = "testCheckJob";
-        
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 expectedCheckJobId);
-        Optional<String> actualCheckJobIdOpt = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
 expectedCheckJobId);
+        Optional<String> actualCheckJobIdOpt = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         assertTrue(actualCheckJobIdOpt.isPresent());
         assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId));
-        
governanceRepositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
-        
assertFalse(governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
+        
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+        
assertFalse(governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
     }
     
     @Test
@@ -126,8 +126,8 @@ class GovernanceRepositoryAPIImplTest {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
         Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
         actual.put("test", new TableDataConsistencyCheckResult(true));
-        
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
 "j02123", actual);
-        Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
 "j02123");
+        
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
 "j02123", actual);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
 "j02123");
         assertThat(checkResult.size(), is(1));
         assertTrue(checkResult.get("test").isMatched());
     }
@@ -135,23 +135,23 @@ class GovernanceRepositoryAPIImplTest {
     @Test
     void assertPersistJobItemProcess() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
-        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
-        
assertFalse(governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
-        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
-        Optional<String> actual = 
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
+        
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
+        
assertFalse(governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
+        
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue1");
+        Optional<String> actual = 
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get(), is("testValue1"));
-        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue2");
-        actual = 
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
+        
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), "testValue2");
+        actual = 
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get(), is("testValue2"));
     }
     
     @Test
     void assertPersistJobOffset() {
-        
assertFalse(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
-        
governanceRepositoryAPI.getJobOffsetGovernanceRepository().persist("1", new 
JobOffsetInfo(true));
-        
assertTrue(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+        
assertFalse(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+        governanceFacade.getJobOffsetGovernanceRepository().persist("1", new 
JobOffsetInfo(true));
+        
assertTrue(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 075ac4950fe..437293bca13 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -51,7 +51,7 @@ class ConsistencyCheckJobTest {
         ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
         String checkJobId = pipelineJobId.marshal();
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
 0,
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
         ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
         ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildPipelineJobItemContext(
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index fd217a71ec8..309b944d451 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
 
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -82,7 +82,7 @@ class ConsistencyCheckJobAPITest {
     void assertDropByParentJobId() {
         MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
         String parentJobId = parentJobConfig.getJobId();
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
         int expectedSequence = 1;
         for (int i = 0; i < 3; i++) {
             String checkJobId = jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
@@ -91,20 +91,20 @@ class ConsistencyCheckJobAPITest {
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
             jobItemManager.persistProgress(checkJobItemContext);
             Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
-            
repositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
-            Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+            
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
+            Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence++));
         }
         expectedSequence = 2;
         for (int i = 0; i < 2; i++) {
             jobAPI.dropByParentJobId(parentJobId);
-            Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+            Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence--));
         }
         jobAPI.dropByParentJobId(parentJobId);
-        Optional<String> latestCheckJobId = 
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+        Optional<String> latestCheckJobId = 
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
         assertFalse(latestCheckJobId.isPresent());
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 7e0cdfdab72..9c3418ef846 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -303,7 +303,7 @@ class MigrationJobAPITest {
         YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new 
YamlInventoryIncrementalJobItemProgress();
         yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
         yamlJobItemProgress.setSourceDatabaseType("MySQL");
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         assertThat(jobItemInfos.size(), is(1));
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -320,7 +320,7 @@ class MigrationJobAPITest {
         
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
         yamlJobItemProgress.setProcessedRecordsCount(100);
         yamlJobItemProgress.setInventoryRecordsCount(50);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
         assertThat(jobItemInfo.getJobItemProgress().getStatus(), 
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index d23437a551a..3389cb2555c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -61,9 +61,9 @@ class MigrationDataConsistencyCheckerTest {
         jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new 
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         jobConfigurationPOJO.setJobName(jobConfig.getJobId());
         jobConfigurationPOJO.setShardingTotalCount(1);
-        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
         
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
-        
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
 0, "");
+        
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
 0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
         String checkKey = "t_order";

Reply via email to