This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new faf380b73a2 Refactor GovernanceRepositoryAPI (#29129)
faf380b73a2 is described below
commit faf380b73a280fcf94160d74cebcd3fd8246a5d3
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 12:11:49 2023 +0800
Refactor GovernanceRepositoryAPI (#29129)
* Add PipelineJobConfigurationGovernanceRepository
* Add PipelineJobOffsetGovernanceRepository
* Add PipelineJobItemProcessGovernanceRepository
* Add PipelineJobItemErrorMessageGovernanceRepository
* Add PipelineJobCheckGovernanceRepository
* Add PipelineJobGovernanceRepository
* Refactor PipelineJobOffsetGovernanceRepository
* Add PipelineMetaDataDataSourceGovernanceRepository
* Add PipelineMetaDataProcessConfigurationGovernanceRepository
* Add PipelineMetaDataProcessConfigurationGovernanceRepository
* Fix test case
* Fix test case
---
.../repository/GovernanceRepositoryAPI.java | 206 ++++-----------------
.../repository/GovernanceRepositoryAPIImpl.java | 176 +++---------------
.../PipelineJobCheckGovernanceRepository.java | 134 ++++++++++++++
...pelineJobConfigurationGovernanceRepository.java | 53 ++++++
.../PipelineJobGovernanceRepository.java | 51 +++++
...ineJobItemErrorMessageGovernanceRepository.java | 53 ++++++
...PipelineJobItemProcessGovernanceRepository.java | 68 +++++++
.../PipelineJobOffsetGovernanceRepository.java | 71 +++++++
...lineMetaDataDataSourceGovernanceRepository.java | 51 +++++
...taProcessConfigurationGovernanceRepository.java | 51 +++++
.../service/PipelineJobIteErrorMessageManager.java | 6 +-
.../core/job/service/PipelineJobItemManager.java | 10 +-
.../core/job/service/PipelineJobManager.java | 12 +-
.../metadata/PipelineDataSourcePersistService.java | 4 +-
...PipelineProcessConfigurationPersistService.java | 4 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 8 +-
.../api/impl/ConsistencyCheckJobAPI.java | 24 +--
.../task/ConsistencyCheckTasksRunner.java | 3 +-
.../migration/api/impl/MigrationJobAPI.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 4 +-
.../service/GovernanceRepositoryAPIImplTest.java | 113 ++++++-----
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 8 +-
.../migration/api/impl/MigrationJobAPITest.java | 4 +-
.../MigrationDataConsistencyCheckerTest.java | 2 +-
25 files changed, 688 insertions(+), 432 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index 87496931229..930d24226b0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,215 +17,73 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
/**
* Governance repository API.
*/
public interface GovernanceRepositoryAPI {
/**
- * Watch pipeLine root path.
- *
- * @param listener data changed event listener
- */
- void watchPipeLineRootPath(DataChangedEventListener listener);
-
- /**
- * Whether job configuration existed.
- *
- * @param jobId jobId
- * @return job configuration exist or not
- */
- boolean isJobConfigurationExisted(String jobId);
-
- /**
- * Persist job offset info.
- *
- * @param jobId job id
- * @param jobOffsetInfo job offset info
- */
- void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
-
- /**
- * Get job offset info.
- *
- * @param jobId job id
- * @return job offset info
- */
- JobOffsetInfo getJobOffsetInfo(String jobId);
-
- /**
- * Persist job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param progressValue progress value
- */
- void persistJobItemProgress(String jobId, int shardingItem, String
progressValue);
-
- /**
- * Update job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param progressValue progress value
- */
- void updateJobItemProgress(String jobId, int shardingItem, String
progressValue);
-
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return job item progress
- */
- Optional<String> getJobItemProgress(String jobId, int shardingItem);
-
- /**
- * Get latest check job id.
- *
- * @param parentJobId parent job id
- * @return check job id
- */
- Optional<String> getLatestCheckJobId(String parentJobId);
-
- /**
- * Persist latest check job id.
- *
- * @param parentJobId job id
- * @param checkJobId check job id
- */
- void persistLatestCheckJobId(String parentJobId, String checkJobId);
-
- /**
- * Delete latest check job id.
- *
- * @param parentJobId parent job id
- */
- void deleteLatestCheckJobId(String parentJobId);
-
- /**
- * Get check job result.
- *
- * @param parentJobId parent job id
- * @param checkJobId check job id
- * @return check job result
- */
- Map<String, TableDataConsistencyCheckResult> getCheckJobResult(String
parentJobId, String checkJobId);
-
- /**
- * Persist check job result.
- *
- * @param parentJobId parent job id
- * @param checkJobId check job id
- * @param checkResultMap check result map
- */
- void persistCheckJobResult(String parentJobId, String checkJobId,
Map<String, TableDataConsistencyCheckResult> checkResultMap);
-
- /**
- * Delete check job result.
- *
- * @param parentJobId parent job id
- * @param checkJobId check job id
- */
- void deleteCheckJobResult(String parentJobId, String checkJobId);
-
- /**
- * List check job ids.
- *
- * @param parentJobId parent job id
- * @return check job ids
- */
- Collection<String> listCheckJobIds(String parentJobId);
-
- /**
- * Delete job.
- *
- * @param jobId job id
- */
- void deleteJob(String jobId);
-
- /**
- * Persist job root info.
- *
- * @param jobId job ID
- * @param jobClass job class
+ * Get job configuration governance repository.
+ *
+ * @return job configuration governance repository
*/
- void persistJobRootInfo(String jobId, Class<? extends PipelineJob>
jobClass);
+ PipelineJobConfigurationGovernanceRepository
getJobConfigurationGovernanceRepository();
/**
- * Persist job configuration.
+ * Get job offset governance repository.
*
- * @param jobId job ID
- * @param jobConfigPOJO job configuration POJO
+ * @return job offset governance repository
*/
- void persistJobConfiguration(String jobId, JobConfigurationPOJO
jobConfigPOJO);
+ PipelineJobOffsetGovernanceRepository getJobOffsetGovernanceRepository();
/**
- * Update job item error message.
+ * Get job item process governance repository.
*
- * @param jobId job ID
- * @param shardingItem sharding item
- * @param errorMessage error message
+ * @return job item process governance repository
*/
- void updateJobItemErrorMessage(String jobId, int shardingItem, String
errorMessage);
+ PipelineJobItemProcessGovernanceRepository
getJobItemProcessGovernanceRepository();
/**
- * Get sharding items of job.
- *
- * @param jobId job id
- * @return sharding items
+ * Get job item error message governance repository.
+ *
+ * @return job item error message governance repository
*/
- List<Integer> getShardingItems(String jobId);
+ PipelineJobItemErrorMessageGovernanceRepository
getJobItemErrorMessageGovernanceRepository();
/**
- * Get meta data data sources.
- *
- * @param jobType job type
- * @return data source properties
+ * Get job check governance repository.
+ *
+ * @return job check governance repository
*/
- String getMetaDataDataSources(String jobType);
+ PipelineJobCheckGovernanceRepository getJobCheckGovernanceRepository();
/**
- * Persist meta data data sources.
- *
- * @param jobType job type
- * @param metaDataDataSources data source properties
+ * Get job governance repository.
+ *
+ * @return job governance repository
*/
- void persistMetaDataDataSources(String jobType, String
metaDataDataSources);
+ PipelineJobGovernanceRepository getJobGovernanceRepository();
/**
- * Get meta data process configuration.
+ * Get meta data data source governance repository.
*
- * @param jobType job type, nullable
- * @return process configuration YAML text
+ * @return meta data data source governance repository
*/
- String getMetaDataProcessConfiguration(String jobType);
+ PipelineMetaDataDataSourceGovernanceRepository
getMetaDataDataSourceGovernanceRepository();
/**
- * Persist meta data process configuration.
- *
- * @param jobType job type, nullable
- * @param processConfigYamlText process configuration YAML text
+ * Get meta data process configuration governance repository.
+ *
+ * @return meta data process configuration governance repository
*/
- void persistMetaDataProcessConfiguration(String jobType, String
processConfigYamlText);
+ PipelineMetaDataProcessConfigurationGovernanceRepository
getMetaDataProcessConfigurationGovernanceRepository();
/**
- * Get job item error msg.
+ * Watch pipeLine root path.
*
- * @param jobId job id
- * @param shardingItem sharding item
- * @return error msg
+ * @param listener data changed event listener
*/
- String getJobItemErrorMessage(String jobId, int shardingItem);
+ void watchPipeLineRootPath(DataChangedEventListener listener);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 1ea1dee6c94..2605c893bfb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -17,181 +17,49 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
-import com.google.common.base.Strings;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
/**
* Governance repository API impl.
*/
-@RequiredArgsConstructor
-@Slf4j
+@Getter
public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAPI {
private final ClusterPersistRepository repository;
- @Override
- public void watchPipeLineRootPath(final DataChangedEventListener listener)
{
- repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
- }
-
- @Override
- public boolean isJobConfigurationExisted(final String jobId) {
- return null !=
repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
- }
-
- @Override
- public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
- repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId),
YamlEngine.marshal(new
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)));
- }
-
- @Override
- public JobOffsetInfo getJobOffsetInfo(final String jobId) {
- String value =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
- return new
YamlJobOffsetInfoSwapper().swapToObject(Strings.isNullOrEmpty(value) ? new
YamlJobOffsetInfo() : YamlEngine.unmarshal(value, YamlJobOffsetInfo.class));
- }
-
- @Override
- public void persistJobItemProgress(final String jobId, final int
shardingItem, final String progressValue) {
- repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
- }
-
- @Override
- public void updateJobItemProgress(final String jobId, final int
shardingItem, final String progressValue) {
- repository.update(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
- }
-
- @Override
- public Optional<String> getJobItemProgress(final String jobId, final int
shardingItem) {
- String text =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
- return Strings.isNullOrEmpty(text) ? Optional.empty() :
Optional.of(text);
- }
-
- @Override
- public Optional<String> getLatestCheckJobId(final String parentJobId) {
- return
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId)));
- }
-
- @Override
- public void persistLatestCheckJobId(final String parentJobId, final String
checkJobId) {
-
repository.persist(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId),
String.valueOf(checkJobId));
- }
-
- @Override
- public void deleteLatestCheckJobId(final String parentJobId) {
-
repository.delete(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, TableDataConsistencyCheckResult>
getCheckJobResult(final String parentJobId, final String checkJobId) {
- String yamlCheckResultMapText =
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId));
- if (Strings.isNullOrEmpty(yamlCheckResultMapText)) {
- return Collections.emptyMap();
- }
- YamlTableDataConsistencyCheckResultSwapper swapper = new
YamlTableDataConsistencyCheckResultSwapper();
- Map<String, String> yamlCheckResultMap =
YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
- Map<String, TableDataConsistencyCheckResult> result = new
HashMap<>(yamlCheckResultMap.size(), 1F);
- for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
- result.put(entry.getKey(), swapper.swapToObject(entry.getValue()));
- }
- return result;
- }
-
- @Override
- public void persistCheckJobResult(final String parentJobId, final String
checkJobId, final Map<String, TableDataConsistencyCheckResult> checkResultMap) {
- if (null == checkResultMap) {
- return;
- }
- Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
- for (Entry<String, TableDataConsistencyCheckResult> entry :
checkResultMap.entrySet()) {
- YamlTableDataConsistencyCheckResult yamlCheckResult = new
YamlTableDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
- yamlCheckResultMap.put(entry.getKey(),
YamlEngine.marshal(yamlCheckResult));
- }
-
repository.persist(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId), YamlEngine.marshal(yamlCheckResultMap));
- }
-
- @Override
- public void deleteCheckJobResult(final String parentJobId, final String
checkJobId) {
-
repository.delete(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId));
- }
-
- @Override
- public Collection<String> listCheckJobIds(final String parentJobId) {
- return
repository.getChildrenKeys(PipelineMetaDataNode.getCheckJobIdsRootPath(parentJobId));
- }
-
- @Override
- public void deleteJob(final String jobId) {
- repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
- }
+ private final PipelineJobConfigurationGovernanceRepository
jobConfigurationGovernanceRepository;
- @Override
- public void persistJobRootInfo(final String jobId, final Class<? extends
PipelineJob> jobClass) {
- repository.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobClass.getName());
- }
+ private final PipelineJobOffsetGovernanceRepository
jobOffsetGovernanceRepository;
- @Override
- public void persistJobConfiguration(final String jobId, final
JobConfigurationPOJO jobConfigPOJO) {
-
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId),
YamlEngine.marshal(jobConfigPOJO));
- }
+ private final PipelineJobItemProcessGovernanceRepository
jobItemProcessGovernanceRepository;
- @Override
- public void updateJobItemErrorMessage(final String jobId, final int
shardingItem, final String errorMessage) {
-
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), errorMessage);
- }
+ private final PipelineJobItemErrorMessageGovernanceRepository
jobItemErrorMessageGovernanceRepository;
- @Override
- public List<Integer> getShardingItems(final String jobId) {
- List<String> result =
repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
- return
result.stream().map(Integer::parseInt).collect(Collectors.toList());
- }
+ private final PipelineJobCheckGovernanceRepository
jobCheckGovernanceRepository;
- @Override
- public String getMetaDataDataSources(final String jobType) {
- return
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
- }
+ private final PipelineJobGovernanceRepository jobGovernanceRepository;
- @Override
- public void persistMetaDataDataSources(final String jobType, final String
metaDataDataSources) {
-
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType),
metaDataDataSources);
- }
+ private final PipelineMetaDataDataSourceGovernanceRepository
metaDataDataSourceGovernanceRepository;
- @Override
- public String getMetaDataProcessConfiguration(final String jobType) {
- return
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
- }
+ private final PipelineMetaDataProcessConfigurationGovernanceRepository
metaDataProcessConfigurationGovernanceRepository;
- @Override
- public void persistMetaDataProcessConfiguration(final String jobType,
final String processConfigYamlText) {
-
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType),
processConfigYamlText);
+ public GovernanceRepositoryAPIImpl(final ClusterPersistRepository
repository) {
+ this.repository = repository;
+ jobConfigurationGovernanceRepository = new
PipelineJobConfigurationGovernanceRepository(repository);
+ jobOffsetGovernanceRepository = new
PipelineJobOffsetGovernanceRepository(repository);
+ jobItemProcessGovernanceRepository = new
PipelineJobItemProcessGovernanceRepository(repository);
+ jobItemErrorMessageGovernanceRepository = new
PipelineJobItemErrorMessageGovernanceRepository(repository);
+ jobCheckGovernanceRepository = new
PipelineJobCheckGovernanceRepository(repository);
+ jobGovernanceRepository = new
PipelineJobGovernanceRepository(repository);
+ metaDataDataSourceGovernanceRepository = new
PipelineMetaDataDataSourceGovernanceRepository(repository);
+ metaDataProcessConfigurationGovernanceRepository = new
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
}
@Override
- public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
- return
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
+ public void watchPipeLineRootPath(final DataChangedEventListener listener)
{
+ repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
new file mode 100644
index 00000000000..e5c44dfdd3c
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+/**
+ * Pipeline job check governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobCheckGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Get latest check job id.
+ *
+ * @param parentJobId parent job id
+ * @return check job id
+ */
+ public Optional<String> getLatestCheckJobId(final String parentJobId) {
+ return
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId)));
+ }
+
+ /**
+ * Persist latest check job id.
+ *
+ * @param parentJobId job id
+ * @param checkJobId check job id
+ */
+ public void persistLatestCheckJobId(final String parentJobId, final String
checkJobId) {
+
repository.persist(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId),
String.valueOf(checkJobId));
+ }
+
+ /**
+ * Delete latest check job id.
+ *
+ * @param parentJobId parent job id
+ */
+ public void deleteLatestCheckJobId(final String parentJobId) {
+
repository.delete(PipelineMetaDataNode.getLatestCheckJobIdPath(parentJobId));
+ }
+
+ /**
+ * Get check job result.
+ *
+ * @param parentJobId parent job id
+ * @param checkJobId check job id
+ * @return check job result
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String, TableDataConsistencyCheckResult>
getCheckJobResult(final String parentJobId, final String checkJobId) {
+ String yamlCheckResultMapText =
repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId));
+ if (Strings.isNullOrEmpty(yamlCheckResultMapText)) {
+ return Collections.emptyMap();
+ }
+ YamlTableDataConsistencyCheckResultSwapper swapper = new
YamlTableDataConsistencyCheckResultSwapper();
+ Map<String, String> yamlCheckResultMap =
YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
+ Map<String, TableDataConsistencyCheckResult> result = new
HashMap<>(yamlCheckResultMap.size(), 1F);
+ for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
+ result.put(entry.getKey(), swapper.swapToObject(entry.getValue()));
+ }
+ return result;
+ }
+
+ /**
+ * Persist check job result.
+ *
+ * @param parentJobId parent job id
+ * @param checkJobId check job id
+ * @param checkResultMap check result map
+ */
+ public void persistCheckJobResult(final String parentJobId, final String
checkJobId, final Map<String, TableDataConsistencyCheckResult> checkResultMap) {
+ if (null == checkResultMap) {
+ return;
+ }
+ Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
+ for (Entry<String, TableDataConsistencyCheckResult> entry :
checkResultMap.entrySet()) {
+ YamlTableDataConsistencyCheckResult yamlCheckResult = new
YamlTableDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+ yamlCheckResultMap.put(entry.getKey(),
YamlEngine.marshal(yamlCheckResult));
+ }
+
repository.persist(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId), YamlEngine.marshal(yamlCheckResultMap));
+ }
+
+ /**
+ * Delete check job result.
+ *
+ * @param parentJobId parent job id
+ * @param checkJobId check job id
+ */
+ public void deleteCheckJobResult(final String parentJobId, final String
checkJobId) {
+
repository.delete(PipelineMetaDataNode.getCheckJobResultPath(parentJobId,
checkJobId));
+ }
+
+ /**
+ * List check job ids.
+ *
+ * @param parentJobId parent job id
+ * @return check job ids
+ */
+ public Collection<String> listCheckJobIds(final String parentJobId) {
+ return
repository.getChildrenKeys(PipelineMetaDataNode.getCheckJobIdsRootPath(parentJobId));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
new file mode 100644
index 00000000000..9b6d13bf983
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline job configuration governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobConfigurationGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Whether pipeline job configuration existed.
+ *
+ * @param jobId jobId
+ * @return pipeline job configuration exists or not
+ */
+ public boolean isExisted(final String jobId) {
+ return null !=
repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
+ }
+
+ /**
+ * Persist pipeline job configuration.
+ *
+ * @param jobId job ID
+ * @param jobConfigPOJO job configuration POJO
+ */
+ public void persist(final String jobId, final JobConfigurationPOJO
jobConfigPOJO) {
+
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId),
YamlEngine.marshal(jobConfigPOJO));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
new file mode 100644
index 00000000000..a403f0f7b0e
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline job governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Create job.
+ *
+ * @param jobId job ID
+ * @param jobClass job class
+ */
+ public void create(final String jobId, final Class<? extends PipelineJob>
jobClass) {
+ repository.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobClass.getName());
+ }
+
+ /**
+ * Delete job.
+ *
+ * @param jobId job id
+ */
+ public void delete(final String jobId) {
+ repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
new file mode 100644
index 00000000000..eee1bd77b57
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline job item error message governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobItemErrorMessageGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Update job item error message.
+ *
+ * @param jobId job ID
+ * @param shardingItem sharding item
+ * @param errorMessage error message
+ */
+ public void update(final String jobId, final int shardingItem, final
String errorMessage) {
+
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), errorMessage);
+ }
+
+ /**
+ * Load job item error msg.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return error msg
+ */
+ public String load(final String jobId, final int shardingItem) {
+ return
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
new file mode 100644
index 00000000000..7ea4887fdb8
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Optional;
+
+/**
+ * Pipeline job item process governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobItemProcessGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Persist job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param progressValue progress value
+ */
+ public void persist(final String jobId, final int shardingItem, final
String progressValue) {
+ repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
+ }
+
+ /**
+ * Update job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param progressValue progress value
+ */
+ public void update(final String jobId, final int shardingItem, final
String progressValue) {
+ repository.update(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
+ }
+
+ /**
+ * Load job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job item progress
+ */
+ public Optional<String> load(final String jobId, final int shardingItem) {
+ String text =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
+ return Strings.isNullOrEmpty(text) ? Optional.empty() :
Optional.of(text);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
new file mode 100644
index 00000000000..412f78e4e6e
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline job offset governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobOffsetGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Persist job offset info.
+ *
+ * @param jobId job id
+ * @param jobOffsetInfo job offset info
+ */
+ public void persist(final String jobId, final JobOffsetInfo jobOffsetInfo)
{
+ repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId),
YamlEngine.marshal(new
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)));
+ }
+
+ /**
+ * Load job offset info.
+ *
+ * @param jobId job id
+ * @return job offset info
+ */
+ public JobOffsetInfo load(final String jobId) {
+ String value =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
+ return new
YamlJobOffsetInfoSwapper().swapToObject(Strings.isNullOrEmpty(value) ? new
YamlJobOffsetInfo() : YamlEngine.unmarshal(value, YamlJobOffsetInfo.class));
+ }
+
+ /**
+ * Get sharding items of job.
+ *
+ * @param jobId job id
+ * @return sharding items
+ */
+ public List<Integer> getShardingItems(final String jobId) {
+ List<String> result =
repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
+ return
result.stream().map(Integer::parseInt).collect(Collectors.toList());
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
new file mode 100644
index 00000000000..870cc0c20dc
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline meta data data source governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineMetaDataDataSourceGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Persist meta data data sources.
+ *
+ * @param jobType job type
+ * @param metaDataDataSources data source properties
+ */
+ public void persist(final String jobType, final String
metaDataDataSources) {
+
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType),
metaDataDataSources);
+ }
+
+ /**
+ * Load meta data data sources.
+ *
+ * @param jobType job type
+ * @return data source properties
+ */
+ public String load(final String jobType) {
+ return
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
new file mode 100644
index 00000000000..e6f3e1ffb58
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Pipeline meta data process configuration governance repository.
+ */
+@RequiredArgsConstructor
+public final class PipelineMetaDataProcessConfigurationGovernanceRepository {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Persist meta data process configuration.
+ *
+ * @param jobType job type, nullable
+ * @param processConfigYamlText process configuration YAML text
+ */
+ public void persist(final String jobType, final String
processConfigYamlText) {
+
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType),
processConfigYamlText);
+ }
+
+ /**
+ * Load meta data process configuration.
+ *
+ * @param jobType job type, nullable
+ * @return process configuration YAML text
+ */
+ public String load(final String jobType) {
+ return
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index 669d7eebbfe..879b241e58e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -46,7 +46,7 @@ public final class PipelineJobIteErrorMessageManager {
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage() {
- return
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
+ return
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().load(jobId,
shardingItem)).orElse("");
}
/**
@@ -55,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
* @param error error
*/
public void updateErrorMessage(final Object error) {
- governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem,
null == error ? "" : buildErrorMessage(error));
+
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, null == error ? "" : buildErrorMessage(error));
}
private String buildErrorMessage(final Object error) {
@@ -66,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
* Clean job item error message.
*/
public void cleanErrorMessage() {
- governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem,
"");
+
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, "");
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index c6e6ec47e5a..78a880c68c9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -54,8 +54,8 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
return;
}
jobItemProgress.get().setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI(
-
PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId))
+ .getJobItemProcessGovernanceRepository().update(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
/**
@@ -66,7 +66,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
* @return job item progress
*/
public Optional<T> getProgress(final String jobId, final int shardingItem)
{
- return
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem)
+ return
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
shardingItem)
.map(optional ->
swapper.swapToObject(YamlEngine.unmarshal(optional,
swapper.getYamlProgressClass(), true)));
}
@@ -77,7 +77,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
*/
public void persistProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}
/**
@@ -87,7 +87,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
*/
public void updateProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}
@SuppressWarnings("unchecked")
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 9dfa1c4f62a..a3a89ccf6e6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -75,12 +75,12 @@ public final class PipelineJobManager {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobId));
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
- if (repositoryAPI.isJobConfigurationExisted(jobId)) {
+ if
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
log.warn("jobId already exists in registry center, ignore, job id
is `{}`", jobId);
return Optional.of(jobId);
}
- repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
- repositoryAPI.persistJobConfiguration(jobId,
jobConfig.convertToJobConfigurationPOJO());
+ repositoryAPI.getJobGovernanceRepository().create(jobId,
jobAPI.getJobClass());
+ repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobId,
jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}
@@ -119,7 +119,7 @@ public final class PipelineJobManager {
}
private void startNextDisabledJob(final String jobId, final String
toBeStartDisabledNextJobType) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
-> {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStartDisabledNextJobType)).startDisabledJob(optional);
// CHECKSTYLE:OFF
@@ -141,7 +141,7 @@ public final class PipelineJobManager {
}
private void stopPreviousJob(final String jobId, final String
toBeStoppedPreviousJobType) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
-> {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStoppedPreviousJobType)).stop(optional);
// CHECKSTYLE:OFF
@@ -176,7 +176,7 @@ public final class PipelineJobManager {
public void drop(final String jobId) {
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId),
null);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getJobGovernanceRepository().delete(jobId);
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index 00119f7d408..42e0035890d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
@Override
@SuppressWarnings("unchecked")
public Map<String, DataSourcePoolProperties> load(final PipelineContextKey
contextKey, final String jobType) {
- String dataSourcesProps =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSources(jobType);
+ String dataSourcesProps =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
}
@@ -55,6 +55,6 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
dataSourceMap.put(entry.getKey(),
swapper.swapToMap(entry.getValue()));
}
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataDataSources(jobType,
YamlEngine.marshal(dataSourceMap));
+
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
YamlEngine.marshal(dataSourceMap));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index 4df71589bf2..b20a5e22935 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
@Override
public PipelineProcessConfiguration load(final PipelineContextKey
contextKey, final String jobType) {
- String yamlText =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfiguration(jobType);
+ String yamlText =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
if (Strings.isNullOrEmpty(yamlText)) {
return null;
}
@@ -45,6 +45,6 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
@Override
public void persist(final PipelineContextKey contextKey, final String
jobType, final PipelineProcessConfiguration processConfig) {
String yamlText =
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataProcessConfiguration(jobType,
yamlText);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
yamlText);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 0fdff431d4b..a46629de46d 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -121,13 +121,13 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
+ if
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
{
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
- repositoryAPI.persistJobRootInfo(jobConfig.getJobId(),
getJobClass());
+
repositoryAPI.getJobGovernanceRepository().create(jobConfig.getJobId(),
getJobClass());
JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
- repositoryAPI.persistJobConfiguration(jobConfig.getJobId(),
jobConfigPOJO);
+
repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
@@ -177,7 +177,7 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
}
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
jobId, i,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index f9c1db64a65..e6b5c4a2ac9 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -80,7 +80,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
public String createJobAndStart(final CreateConsistencyCheckJobParameter
param) {
String parentJobId = param.getParentJobId();
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
PipelineJobItemManager<ConsistencyCheckJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> progress =
jobItemManager.getProgress(latestCheckJobId.get(), 0);
@@ -92,8 +92,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
verifyPipelineDatabaseType(param);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
String result = latestCheckJobId.map(s -> new
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
- repositoryAPI.persistLatestCheckJobId(parentJobId, result);
- repositoryAPI.deleteCheckJobResult(parentJobId, result);
+
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
result);
+
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
result);
new PipelineJobManager(this).drop(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new
YamlConsistencyCheckJobConfiguration();
yamlConfig.setJobId(result);
@@ -126,7 +126,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private String getLatestCheckJobId(final String parentJobId) {
- Optional<String> result =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getLatestCheckJobId(parentJobId);
+ Optional<String> result =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(result.isPresent(), () -> new
ConsistencyCheckJobNotFoundException(parentJobId));
return result.get();
}
@@ -150,16 +150,16 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
new PipelineJobManager(this).stop(latestCheckJobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
- Collection<String> checkJobIds =
repositoryAPI.listCheckJobIds(parentJobId);
+ Collection<String> checkJobIds =
repositoryAPI.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
Optional<Integer> previousSequence =
ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
String checkJobId = new ConsistencyCheckJobId(contextKey,
parentJobId, previousSequence.get()).marshal();
- repositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId);
+
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
checkJobId);
} else {
- repositoryAPI.deleteLatestCheckJobId(parentJobId);
+
repositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
}
- repositoryAPI.deleteCheckJobResult(parentJobId, latestCheckJobId);
+
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
latestCheckJobId);
new PipelineJobManager(this).drop(latestCheckJobId);
}
@@ -171,7 +171,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
*/
public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String
parentJobId) {
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -182,7 +182,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
- Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceRepositoryAPI.getCheckJobResult(parentJobId, latestCheckJobId.get());
+ Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
latestCheckJobId.get());
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","),
checkJobResult));
}
if (Objects.equals(jobItemProgress.getIgnoredTableNames(),
jobItemProgress.getTableNames())) {
@@ -212,7 +212,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
private ConsistencyCheckJobItemInfo getJobItemInfo(final String
parentJobId) {
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -233,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
- Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
+ Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null :
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index e75f71f126d..67b7f80a7dc 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -113,7 +113,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
log.info("job {} with check algorithm '{}' data consistency
checker result: {}, stopping: {}",
parentJobId, checkJobConfig.getAlgorithmTypeName(),
checkResultMap, jobItemContext.isStopping());
if (!jobItemContext.isStopping()) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(
+
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
}
} finally {
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index d80211588ae..d18c30e45c6 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -306,7 +306,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
}
private void dropCheckJobs(final String jobId) {
- Collection<String> checkJobIds =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).listCheckJobIds(jobId);
+ Collection<String> checkJobIds =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
if (checkJobIds.isEmpty()) {
return;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 8c46fd1761a..04cd7da0eb9 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -134,12 +134,12 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 600000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
- JobOffsetInfo offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
+ JobOffsetInfo offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
new JobOffsetInfo(true));
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
new JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 7806cda61dc..0e3e0ff9312 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.Placeholde
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
@@ -31,8 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -54,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -85,28 +83,43 @@ class GovernanceRepositoryAPIImplTest {
}
@Test
- void assertIsJobConfigurationExisted() {
-
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
- JobConfigurationPOJO value = new JobConfigurationPOJO();
- value.setJobName("foo_job");
- value.setShardingTotalCount(1);
- getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config",
YamlEngine.marshal(value));
-
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
+ void assertWatch() throws InterruptedException {
+ String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
+ getClusterPersistRepository().persist(key, "");
+ boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
+ assertTrue(awaitResult);
+ DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
+ assertNotNull(event);
+ assertThat(event.getType(), anyOf(is(Type.ADDED), is(Type.UPDATED)));
}
@Test
- void assertPersistJobItemProgress() {
- MigrationJobItemContext jobItemContext = mockJobItemContext();
-
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
-
assertFalse(governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
-
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
- Optional<String> actual =
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- assertTrue(actual.isPresent());
- assertThat(actual.get(), is("testValue1"));
-
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
- actual =
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- assertTrue(actual.isPresent());
- assertThat(actual.get(), is("testValue2"));
+ void assertDeleteJob() {
+ ClusterPersistRepository clusterPersistRepository =
getClusterPersistRepository();
+ clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
+ governanceRepositoryAPI.getJobGovernanceRepository().delete("1");
+ Optional<String> actual = new
PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1",
0);
+ assertFalse(actual.isPresent());
+ }
+
+ @Test
+ void assertIsExistedJobConfiguration() {
+ ClusterPersistRepository clusterPersistRepository =
getClusterPersistRepository();
+
assertFalse(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+ clusterPersistRepository.persist("/pipeline/jobs/foo_job/config",
"foo");
+
assertTrue(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+ }
+
+ @Test
+ void assertLatestCheckJobIdPersistenceDeletion() {
+ String parentJobId = "testParentJob";
+ String expectedCheckJobId = "testCheckJob";
+
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
expectedCheckJobId);
+ Optional<String> actualCheckJobIdOpt =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ assertTrue(actualCheckJobIdOpt.isPresent());
+ assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId));
+
governanceRepositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+
assertFalse(governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
}
@Test
@@ -114,59 +127,43 @@ class GovernanceRepositoryAPIImplTest {
MigrationJobItemContext jobItemContext = mockJobItemContext();
Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
actual.put("test", new TableDataConsistencyCheckResult(true));
-
governanceRepositoryAPI.persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
- Map<String, TableDataConsistencyCheckResult> checkResult =
governanceRepositoryAPI.getCheckJobResult(jobItemContext.getJobId(), "j02123");
+
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
"j02123");
assertThat(checkResult.size(), is(1));
assertTrue(checkResult.get("test").isMatched());
}
@Test
- void assertDeleteJob() {
-
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
- governanceRepositoryAPI.deleteJob("1");
- Optional<String> actual =
governanceRepositoryAPI.getJobItemProgress("1", 0);
- assertFalse(actual.isPresent());
+ void assertPersistJobItemProcess() {
+ MigrationJobItemContext jobItemContext = mockJobItemContext();
+
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+
assertFalse(governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
+
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+ Optional<String> actual =
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("testValue1"));
+
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
+ actual =
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("testValue2"));
}
@Test
- void assertWatch() throws InterruptedException {
- String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
- getClusterPersistRepository().persist(key, "");
- boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
- assertTrue(awaitResult);
- DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
- assertNotNull(event);
- assertThat(event.getType(), anyOf(is(Type.ADDED), is(Type.UPDATED)));
+ void assertPersistJobOffset() {
+
assertFalse(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+
governanceRepositoryAPI.getJobOffsetGovernanceRepository().persist("1", new
JobOffsetInfo(true));
+
assertTrue(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
}
@Test
void assertGetShardingItems() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
-
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue");
- List<Integer> shardingItems =
governanceRepositoryAPI.getShardingItems(jobItemContext.getJobId());
+
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue");
+ List<Integer> shardingItems =
governanceRepositoryAPI.getJobOffsetGovernanceRepository().getShardingItems(jobItemContext.getJobId());
assertThat(shardingItems.size(), is(1));
assertThat(shardingItems.get(0), is(jobItemContext.getShardingItem()));
}
- @Test
- void assertPersistJobOffsetInfo() {
-
assertFalse(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
- governanceRepositoryAPI.persistJobOffsetInfo("1", new
JobOffsetInfo(true));
-
assertTrue(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
- }
-
- @Test
- void assertLatestCheckJobIdPersistenceDeletion() {
- String parentJobId = "testParentJob";
- String expectedCheckJobId = "testCheckJob";
- governanceRepositoryAPI.persistLatestCheckJobId(parentJobId,
expectedCheckJobId);
- Optional<String> actualCheckJobIdOpt =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
- assertTrue(actualCheckJobIdOpt.isPresent(), "Expected a checkJobId to
be present");
- assertEquals(expectedCheckJobId, actualCheckJobIdOpt.get(), "The
retrieved checkJobId does not match the expected one");
- governanceRepositoryAPI.deleteLatestCheckJobId(parentJobId);
-
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(),
"Expected no checkJobId to be present after deletion");
- }
-
private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager =
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository)
contextManager.getMetaDataContexts().getPersistService().getRepository();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index a54697a5b66..075ac4950fe 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -51,7 +51,7 @@ class ConsistencyCheckJobTest {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = pipelineJobId.marshal();
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId,
0,
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildPipelineJobItemContext(
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index fe9ea2db83c..fd217a71ec8 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -91,20 +91,20 @@ class ConsistencyCheckJobAPITest {
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")),
0, JobStatus.FINISHED, null);
jobItemManager.persistProgress(checkJobItemContext);
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
- repositoryAPI.persistCheckJobResult(parentJobId, checkJobId,
dataConsistencyCheckResult);
- Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
+
repositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
+ Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence++));
}
expectedSequence = 2;
for (int i = 0; i < 2; i++) {
jobAPI.dropByParentJobId(parentJobId);
- Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence--));
}
jobAPI.dropByParentJobId(parentJobId);
- Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertFalse(latestCheckJobId.isPresent());
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index ce9b79e1e49..7e0cdfdab72 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -303,7 +303,7 @@ class MigrationJobAPITest {
YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new
YamlInventoryIncrementalJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
assertThat(jobItemInfos.size(), is(1));
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -320,7 +320,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
yamlJobItemProgress.setProcessedRecordsCount(100);
yamlJobItemProgress.setInventoryRecordsCount(50);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 375062ab197..d23437a551a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -63,7 +63,7 @@ class MigrationDataConsistencyCheckerTest {
jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
- governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(),
0, "");
+
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
String checkKey = "t_order";