This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2a7c5f98c0d Refactor GovernanceRepositoryAPI (#29117)
2a7c5f98c0d is described below
commit 2a7c5f98c0d0fb517d8f156f7c4239f9fd93bf35
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 21 23:56:10 2023 +0800
Refactor GovernanceRepositoryAPI (#29117)
* Remove GovernanceRepositoryAPI.getChildrenKeys()
* Remove GovernanceRepositoryAPI.watchPipeLineRootPath()
* Rename GovernanceRepositoryAPI.watchPipeLineRootPath()
* Rename GovernanceRepositoryAPI.updateJobItemErrorMessage()
* Refactor GovernanceRepositoryAPI
---
.../metadata/node/PipelineMetaDataNodeWatcher.java | 2 +-
.../repository/GovernanceRepositoryAPI.java | 44 +++++++++++-----------
.../repository/GovernanceRepositoryAPIImpl.java | 27 +++++++------
.../service/PipelineJobIteErrorMessageManager.java | 5 +--
.../core/job/service/PipelineJobManager.java | 5 +--
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 +--
.../service/GovernanceRepositoryAPIImplTest.java | 24 ++++++------
.../MigrationDataConsistencyCheckerTest.java | 10 ++++-
8 files changed, 66 insertions(+), 56 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
index 39f5c0a064b..42003ec1d66 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern,
each -> each)));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT,
this::dispatchEvent);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent);
}
private void dispatchEvent(final DataChangedEvent event) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index 83904a361a9..87496931229 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,8 +17,10 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.Collection;
@@ -31,6 +33,13 @@ import java.util.Optional;
*/
public interface GovernanceRepositoryAPI {
+ /**
+ * Watch pipeLine root path.
+ *
+ * @param listener data changed event listener
+ */
+ void watchPipeLineRootPath(DataChangedEventListener listener);
+
/**
* Whether job configuration existed.
*
@@ -147,36 +156,29 @@ public interface GovernanceRepositoryAPI {
void deleteJob(String jobId);
/**
- * Get node's sub-nodes list.
- *
- * @param key key of data
- * @return sub-nodes name list
- */
- List<String> getChildrenKeys(String key);
-
- /**
- * Watch key or path of governance server.
+ * Persist job root info.
*
- * @param key key of data
- * @param listener data changed event listener
+ * @param jobId job ID
+ * @param jobClass job class
*/
- void watch(String key, DataChangedEventListener listener);
+ void persistJobRootInfo(String jobId, Class<? extends PipelineJob>
jobClass);
/**
- * Persist data.
- *
- * @param key key of data
- * @param value value of data
+ * Persist job configuration.
+ *
+ * @param jobId job ID
+ * @param jobConfigPOJO job configuration POJO
*/
- void persist(String key, String value);
+ void persistJobConfiguration(String jobId, JobConfigurationPOJO
jobConfigPOJO);
/**
- * Update data.
+ * Update job item error message.
*
- * @param key key of data
- * @param value value of data
+ * @param jobId job ID
+ * @param shardingItem sharding item
+ * @param errorMessage error message
*/
- void update(String key, String value);
+ void updateJobItemErrorMessage(String jobId, int shardingItem, String
errorMessage);
/**
* Get sharding items of job.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 9e12ee8d9a6..1ea1dee6c94 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -20,13 +20,16 @@ package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -50,6 +53,11 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
private final ClusterPersistRepository repository;
+ @Override
+ public void watchPipeLineRootPath(final DataChangedEventListener listener)
{
+ repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
+ }
+
@Override
public boolean isJobConfigurationExisted(final String jobId) {
return null !=
repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
@@ -142,28 +150,23 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public List<String> getChildrenKeys(final String key) {
- return repository.getChildrenKeys(key);
- }
-
- @Override
- public void watch(final String key, final DataChangedEventListener
listener) {
- repository.watch(key, listener);
+ public void persistJobRootInfo(final String jobId, final Class<? extends
PipelineJob> jobClass) {
+ repository.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobClass.getName());
}
@Override
- public void persist(final String key, final String value) {
- repository.persist(key, value);
+ public void persistJobConfiguration(final String jobId, final
JobConfigurationPOJO jobConfigPOJO) {
+
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId),
YamlEngine.marshal(jobConfigPOJO));
}
@Override
- public void update(final String key, final String value) {
- repository.update(key, value);
+ public void updateJobItemErrorMessage(final String jobId, final int
shardingItem, final String errorMessage) {
+
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), errorMessage);
}
@Override
public List<Integer> getShardingItems(final String jobId) {
- List<String> result =
getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
+ List<String> result =
repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
return
result.stream().map(Integer::parseInt).collect(Collectors.toList());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index a20cd2e7a4f..669d7eebbfe 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -56,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
* @param error error
*/
public void updateErrorMessage(final Object error) {
-
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), null == error ? "" : buildErrorMessage(error));
+ governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem,
null == error ? "" : buildErrorMessage(error));
}
private String buildErrorMessage(final Object error) {
@@ -67,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
* Clean job item error message.
*/
public void cleanErrorMessage() {
-
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), "");
+ governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem,
"");
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index e8e1c821481..9dfa1c4f62a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@@ -80,8 +79,8 @@ public final class PipelineJobManager {
log.warn("jobId already exists in registry center, ignore, job id
is `{}`", jobId);
return Optional.of(jobId);
}
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobAPI.getJobClass().getName());
-
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId),
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
+ repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
+ repositoryAPI.persistJobConfiguration(jobId,
jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 17c3daa7a1f..0fdff431d4b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -49,7 +49,6 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipeli
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -125,10 +124,10 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
-
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getJobClass().getName());
+ repositoryAPI.persistJobRootInfo(jobConfig.getJobId(),
getJobClass());
JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
-
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()),
YamlEngine.marshal(jobConfigPOJO));
+ repositoryAPI.persistJobConfiguration(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 9c5bca8cf1d..cf5203910f4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
@@ -32,6 +33,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -71,7 +74,7 @@ class GovernanceRepositoryAPIImplTest {
}
private static void watch() {
- governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT,
event -> {
+ governanceRepositoryAPI.watchPipeLineRootPath(event -> {
if ((PipelineNodePath.DATA_PIPELINE_ROOT +
"/1").equals(event.getKey())) {
EVENT_ATOMIC_REFERENCE.set(event);
COUNT_DOWN_LATCH.countDown();
@@ -82,7 +85,7 @@ class GovernanceRepositoryAPIImplTest {
@Test
void assertIsJobConfigurationExisted() {
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
- governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config",
"foo");
+ getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config",
"foo");
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
}
@@ -114,24 +117,16 @@ class GovernanceRepositoryAPIImplTest {
@Test
void assertDeleteJob() {
- governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
+
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
governanceRepositoryAPI.deleteJob("1");
Optional<String> actual =
governanceRepositoryAPI.getJobItemProgress("1", 0);
assertFalse(actual.isPresent());
}
- @Test
- void assertGetChildrenKeys() {
- governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
- List<String> actual =
governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT);
- assertFalse(actual.isEmpty());
- assertTrue(actual.contains("1"));
- }
-
@Test
void assertWatch() throws InterruptedException {
String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
- governanceRepositoryAPI.persist(key, "");
+ getClusterPersistRepository().persist(key, "");
boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult);
DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
@@ -167,6 +162,11 @@ class GovernanceRepositoryAPIImplTest {
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(),
"Expected no checkJobId to be present after deletion");
}
+ private ClusterPersistRepository getClusterPersistRepository() {
+ ContextManager contextManager =
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
+ return (ClusterPersistRepository)
contextManager.getMetaDataContexts().getPersistService().getRepository();
+ }
+
private MigrationJobItemContext mockJobItemContext() {
MigrationJobItemContext result =
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
MigrationTaskConfiguration taskConfig = result.getTaskConfig();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3ba0e7724e8..375062ab197 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -32,6 +33,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -59,7 +62,7 @@ class MigrationDataConsistencyCheckerTest {
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
-
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
+
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
@@ -68,6 +71,11 @@ class MigrationDataConsistencyCheckerTest {
assertTrue(actual.get(checkKey).isMatched());
}
+ private ClusterPersistRepository getClusterPersistRepository() {
+ ContextManager contextManager =
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
+ return (ClusterPersistRepository)
contextManager.getMetaDataContexts().getPersistService().getRepository();
+ }
+
private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext(final String jobId) {
return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
}