This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 459c7050a2d Refactor PipelineGovernanceFacade (#29140)
459c7050a2d is described below
commit 459c7050a2da46a354151062f8c26ab0012a8b42
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 18:49:22 2023 +0800
Refactor PipelineGovernanceFacade (#29140)
---
.../metadata/node/PipelineMetaDataNodeWatcher.java | 2 +-
.../repository/GovernanceRepositoryAPI.java | 89 ----------------------
...yAPIImpl.java => PipelineGovernanceFacade.java} | 14 +++-
.../core/job/service/PipelineAPIFactory.java | 17 ++---
.../service/PipelineJobIteErrorMessageManager.java | 12 +--
.../core/job/service/PipelineJobItemManager.java | 8 +-
.../core/job/service/PipelineJobManager.java | 16 ++--
.../metadata/PipelineDataSourcePersistService.java | 4 +-
...PipelineProcessConfigurationPersistService.java | 4 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 12 +--
.../api/impl/ConsistencyCheckJobAPI.java | 34 ++++-----
.../task/ConsistencyCheckTasksRunner.java | 2 +-
.../migration/api/impl/MigrationJobAPI.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 4 +-
...Test.java => PipelineGovernanceFacadeTest.java} | 46 +++++------
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 12 +--
.../migration/api/impl/MigrationJobAPITest.java | 4 +-
.../MigrationDataConsistencyCheckerTest.java | 6 +-
19 files changed, 103 insertions(+), 187 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
index 42003ec1d66..df4f34b2506 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern,
each -> each)));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent);
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).watchPipeLineRootPath(this::dispatchEvent);
}
private void dispatchEvent(final DataChangedEvent event) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
deleted file mode 100644
index 930d24226b0..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
-
-import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
-
-/**
- * Governance repository API.
- */
-public interface GovernanceRepositoryAPI {
-
- /**
- * Get job configuration governance repository.
- *
- * @return job configuration governance repository
- */
- PipelineJobConfigurationGovernanceRepository
getJobConfigurationGovernanceRepository();
-
- /**
- * Get job offset governance repository.
- *
- * @return job offset governance repository
- */
- PipelineJobOffsetGovernanceRepository getJobOffsetGovernanceRepository();
-
- /**
- * Get job item process governance repository.
- *
- * @return job item process governance repository
- */
- PipelineJobItemProcessGovernanceRepository
getJobItemProcessGovernanceRepository();
-
- /**
- * Get job item error message governance repository.
- *
- * @return job item error message governance repository
- */
- PipelineJobItemErrorMessageGovernanceRepository
getJobItemErrorMessageGovernanceRepository();
-
- /**
- * Get job check governance repository.
- *
- * @return job check governance repository
- */
- PipelineJobCheckGovernanceRepository getJobCheckGovernanceRepository();
-
- /**
- * Get job governance repository.
- *
- * @return job governance repository
- */
- PipelineJobGovernanceRepository getJobGovernanceRepository();
-
- /**
- * Get meta data data source governance repository.
- *
- * @return meta data data source governance repository
- */
- PipelineMetaDataDataSourceGovernanceRepository
getMetaDataDataSourceGovernanceRepository();
-
- /**
- * Get meta data process configuration governance repository.
- *
- * @return meta data process configuration governance repository
- */
- PipelineMetaDataProcessConfigurationGovernanceRepository
getMetaDataProcessConfigurationGovernanceRepository();
-
- /**
- * Watch pipeLine root path.
- *
- * @param listener data changed event listener
- */
- void watchPipeLineRootPath(DataChangedEventListener listener);
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
similarity index 90%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
index 2605c893bfb..a2b33c35d69 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
@@ -17,17 +17,19 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+import lombok.AccessLevel;
import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
/**
- * Governance repository API impl.
+ * Pipeline governance facade.
*/
@Getter
-public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAPI {
+public final class PipelineGovernanceFacade {
+ @Getter(AccessLevel.NONE)
private final ClusterPersistRepository repository;
private final PipelineJobConfigurationGovernanceRepository
jobConfigurationGovernanceRepository;
@@ -46,7 +48,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
private final PipelineMetaDataProcessConfigurationGovernanceRepository
metaDataProcessConfigurationGovernanceRepository;
- public GovernanceRepositoryAPIImpl(final ClusterPersistRepository
repository) {
+ public PipelineGovernanceFacade(final ClusterPersistRepository repository)
{
this.repository = repository;
jobConfigurationGovernanceRepository = new
PipelineJobConfigurationGovernanceRepository(repository);
jobOffsetGovernanceRepository = new
PipelineJobOffsetGovernanceRepository(repository);
@@ -58,7 +60,11 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
metaDataProcessConfigurationGovernanceRepository = new
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
}
- @Override
+ /**
+ * Watch pipeLine root path.
+ *
+ * @param listener data changed event listener
+ */
public void watchPipeLineRootPath(final DataChangedEventListener listener)
{
repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
index e5b4fbc540c..99108b15730 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineAPIFactory.java
@@ -27,8 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.elasticjob.CoordinatorRegistryCenterInitializer;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
@@ -49,22 +48,22 @@ import java.util.concurrent.ConcurrentHashMap;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineAPIFactory {
- private static final Map<PipelineContextKey,
LazyInitializer<GovernanceRepositoryAPI>> GOVERNANCE_REPOSITORY_API_MAP = new
ConcurrentHashMap<>();
+ private static final Map<PipelineContextKey,
LazyInitializer<PipelineGovernanceFacade>> GOVERNANCE_FACADE_MAP = new
ConcurrentHashMap<>();
/**
- * Get governance repository API.
+ * Get pipeline governance facade.
*
* @param contextKey context key
- * @return governance repository API
+ * @return pipeline governance facade
*/
@SneakyThrows(ConcurrentException.class)
- public static GovernanceRepositoryAPI getGovernanceRepositoryAPI(final
PipelineContextKey contextKey) {
- return GOVERNANCE_REPOSITORY_API_MAP.computeIfAbsent(contextKey, key
-> new LazyInitializer<GovernanceRepositoryAPI>() {
+ public static PipelineGovernanceFacade getPipelineGovernanceFacade(final
PipelineContextKey contextKey) {
+ return GOVERNANCE_FACADE_MAP.computeIfAbsent(contextKey, key -> new
LazyInitializer<PipelineGovernanceFacade>() {
@Override
- protected GovernanceRepositoryAPI initialize() {
+ protected PipelineGovernanceFacade initialize() {
ContextManager contextManager =
PipelineContextManager.getContext(contextKey).getContextManager();
- return new
GovernanceRepositoryAPIImpl((ClusterPersistRepository)
contextManager.getMetaDataContexts().getPersistService().getRepository());
+ return new PipelineGovernanceFacade((ClusterPersistRepository)
contextManager.getMetaDataContexts().getPersistService().getRepository());
}
}).get();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index 879b241e58e..b63e9b5b269 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import java.util.Optional;
@@ -32,12 +32,12 @@ public final class PipelineJobIteErrorMessageManager {
private final int shardingItem;
- private final GovernanceRepositoryAPI governanceRepositoryAPI;
+ private final PipelineGovernanceFacade governanceFacade;
public PipelineJobIteErrorMessageManager(final String jobId, final int
shardingItem) {
this.jobId = jobId;
this.shardingItem = shardingItem;
- governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+ governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
}
/**
@@ -46,7 +46,7 @@ public final class PipelineJobIteErrorMessageManager {
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage() {
- return
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().load(jobId,
shardingItem)).orElse("");
+ return
Optional.ofNullable(governanceFacade.getJobItemErrorMessageGovernanceRepository().load(jobId,
shardingItem)).orElse("");
}
/**
@@ -55,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
* @param error error
*/
public void updateErrorMessage(final Object error) {
-
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, null == error ? "" : buildErrorMessage(error));
+
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, null == error ? "" : buildErrorMessage(error));
}
private String buildErrorMessage(final Object error) {
@@ -66,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
* Clean job item error message.
*/
public void cleanErrorMessage() {
-
governanceRepositoryAPI.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, "");
+
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, "");
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 78a880c68c9..d2463bc7f5f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -54,7 +54,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
return;
}
jobItemProgress.get().setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId))
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId))
.getJobItemProcessGovernanceRepository().update(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@@ -66,7 +66,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
* @return job item progress
*/
public Optional<T> getProgress(final String jobId, final int shardingItem)
{
- return
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
shardingItem)
+ return
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
shardingItem)
.map(optional ->
swapper.swapToObject(YamlEngine.unmarshal(optional,
swapper.getYamlProgressClass(), true)));
}
@@ -76,7 +76,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
* @param jobItemContext job item context
*/
public void persistProgress(final PipelineJobItemContext jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}
@@ -86,7 +86,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
* @param jobItemContext job item context
*/
public void updateProgress(final PipelineJobItemContext jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index a3a89ccf6e6..59d205b8528 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
@@ -74,13 +74,13 @@ public final class PipelineJobManager {
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobId));
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
- if
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
+ if
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
log.warn("jobId already exists in registry center, ignore, job id
is `{}`", jobId);
return Optional.of(jobId);
}
- repositoryAPI.getJobGovernanceRepository().create(jobId,
jobAPI.getJobClass());
- repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobId,
jobConfig.convertToJobConfigurationPOJO());
+ governanceFacade.getJobGovernanceRepository().create(jobId,
jobAPI.getJobClass());
+
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobId,
jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}
@@ -119,7 +119,7 @@ public final class PipelineJobManager {
}
private void startNextDisabledJob(final String jobId, final String
toBeStartDisabledNextJobType) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStartDisabledNextJobType)).startDisabledJob(optional);
// CHECKSTYLE:OFF
@@ -141,7 +141,7 @@ public final class PipelineJobManager {
}
private void stopPreviousJob(final String jobId, final String
toBeStoppedPreviousJobType) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStoppedPreviousJobType)).stop(optional);
// CHECKSTYLE:OFF
@@ -176,7 +176,7 @@ public final class PipelineJobManager {
public void drop(final String jobId) {
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId),
null);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getJobGovernanceRepository().delete(jobId);
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobGovernanceRepository().delete(jobId);
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index 42e0035890d..a951f9fee3c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
@Override
@SuppressWarnings("unchecked")
public Map<String, DataSourcePoolProperties> load(final PipelineContextKey
contextKey, final String jobType) {
- String dataSourcesProps =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
+ String dataSourcesProps =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
}
@@ -55,6 +55,6 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
dataSourceMap.put(entry.getKey(),
swapper.swapToMap(entry.getValue()));
}
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
YamlEngine.marshal(dataSourceMap));
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
YamlEngine.marshal(dataSourceMap));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index b20a5e22935..5b8c1bce798 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
@Override
public PipelineProcessConfiguration load(final PipelineContextKey
contextKey, final String jobType) {
- String yamlText =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
+ String yamlText =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
if (Strings.isNullOrEmpty(yamlText)) {
return null;
}
@@ -45,6 +45,6 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
@Override
public void persist(final PipelineContextKey contextKey, final String
jobType, final PipelineProcessConfiguration processConfig) {
String yamlText =
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
yamlText);
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
yamlText);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index a46629de46d..0b4a06dd368 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -51,7 +51,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrem
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
@@ -120,14 +120,14 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- if
(repositoryAPI.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
{
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ if
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
{
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
-
repositoryAPI.getJobGovernanceRepository().create(jobConfig.getJobId(),
getJobClass());
+
governanceFacade.getJobGovernanceRepository().create(jobConfig.getJobId(),
getJobClass());
JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
-
repositoryAPI.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
jobConfigPOJO);
+
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
@@ -177,7 +177,7 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
}
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
jobId, i,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index e6b5c4a2ac9..d9a48111722 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -79,8 +79,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
*/
public String createJobAndStart(final CreateConsistencyCheckJobParameter
param) {
String parentJobId = param.getParentJobId();
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
+ Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
PipelineJobItemManager<ConsistencyCheckJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> progress =
jobItemManager.getProgress(latestCheckJobId.get(), 0);
@@ -92,8 +92,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
verifyPipelineDatabaseType(param);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
String result = latestCheckJobId.map(s -> new
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
-
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
result);
-
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
result);
+
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
result);
+
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
result);
new PipelineJobManager(this).drop(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new
YamlConsistencyCheckJobConfiguration();
yamlConfig.setJobId(result);
@@ -126,7 +126,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private String getLatestCheckJobId(final String parentJobId) {
- Optional<String> result =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> result =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(result.isPresent(), () -> new
ConsistencyCheckJobNotFoundException(parentJobId));
return result.get();
}
@@ -149,17 +149,17 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
String latestCheckJobId = getLatestCheckJobId(parentJobId);
new PipelineJobManager(this).stop(latestCheckJobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
- Collection<String> checkJobIds =
repositoryAPI.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
+ Collection<String> checkJobIds =
governanceFacade.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
Optional<Integer> previousSequence =
ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
String checkJobId = new ConsistencyCheckJobId(contextKey,
parentJobId, previousSequence.get()).marshal();
-
repositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
checkJobId);
+
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
checkJobId);
} else {
-
repositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
}
-
repositoryAPI.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
latestCheckJobId);
+
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
latestCheckJobId);
new PipelineJobManager(this).drop(latestCheckJobId);
}
@@ -170,8 +170,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
* @return consistency job item infos
*/
public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String
parentJobId) {
- GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
+ Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -182,7 +182,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
- Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
latestCheckJobId.get());
+ Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
latestCheckJobId.get());
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","),
checkJobResult));
}
if (Objects.equals(jobItemProgress.getIgnoredTableNames(),
jobItemProgress.getTableNames())) {
@@ -211,8 +211,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private ConsistencyCheckJobItemInfo getJobItemInfo(final String
parentJobId) {
- GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
+ Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -233,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
- Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
checkJobId);
+ Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null :
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 67b7f80a7dc..0036cb46113 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -113,7 +113,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
log.info("job {} with check algorithm '{}' data consistency
checker result: {}, stopping: {}",
parentJobId, checkJobConfig.getAlgorithmTypeName(),
checkResultMap, jobItemContext.isStopping());
if (!jobItemContext.isStopping()) {
- PipelineAPIFactory.getGovernanceRepositoryAPI(
+ PipelineAPIFactory.getPipelineGovernanceFacade(
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
}
} finally {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index d18c30e45c6..66d16150c09 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -306,7 +306,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
}
private void dropCheckJobs(final String jobId) {
- Collection<String> checkJobIds =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
+ Collection<String> checkJobIds =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
if (checkJobIds.isEmpty()) {
return;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 04cd7da0eb9..993009506e7 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -134,12 +134,12 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 600000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
- JobOffsetInfo offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
+ JobOffsetInfo offsetInfo =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
new JobOffsetInfo(true));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
new JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
similarity index 74%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
index 0ff4eaa6273..75c9516d8fd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
@@ -57,9 +57,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-class GovernanceRepositoryAPIImplTest {
+class PipelineGovernanceFacadeTest {
- private static GovernanceRepositoryAPI governanceRepositoryAPI;
+ private static PipelineGovernanceFacade governanceFacade;
private static final AtomicReference<DataChangedEvent>
EVENT_ATOMIC_REFERENCE = new AtomicReference<>();
@@ -68,12 +68,12 @@ class GovernanceRepositoryAPIImplTest {
@BeforeAll
static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
- governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+ governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
watch();
}
private static void watch() {
- governanceRepositoryAPI.watchPipeLineRootPath(event -> {
+ governanceFacade.watchPipeLineRootPath(event -> {
if ((PipelineNodePath.DATA_PIPELINE_ROOT +
"/1").equals(event.getKey())) {
EVENT_ATOMIC_REFERENCE.set(event);
COUNT_DOWN_LATCH.countDown();
@@ -96,7 +96,7 @@ class GovernanceRepositoryAPIImplTest {
void assertDeleteJob() {
ClusterPersistRepository clusterPersistRepository =
getClusterPersistRepository();
clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
- governanceRepositoryAPI.getJobGovernanceRepository().delete("1");
+ governanceFacade.getJobGovernanceRepository().delete("1");
Optional<String> actual = new
PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1",
0);
assertFalse(actual.isPresent());
}
@@ -104,21 +104,21 @@ class GovernanceRepositoryAPIImplTest {
@Test
void assertIsExistedJobConfiguration() {
ClusterPersistRepository clusterPersistRepository =
getClusterPersistRepository();
-
assertFalse(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+
assertFalse(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
clusterPersistRepository.persist("/pipeline/jobs/foo_job/config",
"foo");
-
assertTrue(governanceRepositoryAPI.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+
assertTrue(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
}
@Test
void assertLatestCheckJobIdPersistenceDeletion() {
String parentJobId = "testParentJob";
String expectedCheckJobId = "testCheckJob";
-
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
expectedCheckJobId);
- Optional<String> actualCheckJobIdOpt =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
expectedCheckJobId);
+ Optional<String> actualCheckJobIdOpt =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertTrue(actualCheckJobIdOpt.isPresent());
assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId));
-
governanceRepositoryAPI.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
-
assertFalse(governanceRepositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
+
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+
assertFalse(governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
}
@Test
@@ -126,8 +126,8 @@ class GovernanceRepositoryAPIImplTest {
MigrationJobItemContext jobItemContext = mockJobItemContext();
Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
actual.put("test", new TableDataConsistencyCheckResult(true));
-
governanceRepositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
- Map<String, TableDataConsistencyCheckResult> checkResult =
governanceRepositoryAPI.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
"j02123");
+
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
"j02123");
assertThat(checkResult.size(), is(1));
assertTrue(checkResult.get("test").isMatched());
}
@@ -135,23 +135,23 @@ class GovernanceRepositoryAPIImplTest {
@Test
void assertPersistJobItemProcess() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
-
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
-
assertFalse(governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
-
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
- Optional<String> actual =
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+
assertFalse(governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
+
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+ Optional<String> actual =
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get(), is("testValue1"));
-
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
- actual =
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
+ actual =
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get(), is("testValue2"));
}
@Test
void assertPersistJobOffset() {
-
assertFalse(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
-
governanceRepositoryAPI.getJobOffsetGovernanceRepository().persist("1", new
JobOffsetInfo(true));
-
assertTrue(governanceRepositoryAPI.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+
assertFalse(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+ governanceFacade.getJobOffsetGovernanceRepository().persist("1", new
JobOffsetInfo(true));
+
assertTrue(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
}
private ClusterPersistRepository getClusterPersistRepository() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 075ac4950fe..437293bca13 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -51,7 +51,7 @@ class ConsistencyCheckJobTest {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = pipelineJobId.marshal();
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
0,
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildPipelineJobItemContext(
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index fd217a71ec8..309b944d451 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -82,7 +82,7 @@ class ConsistencyCheckJobAPITest {
void assertDropByParentJobId() {
MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
String parentJobId = parentJobConfig.getJobId();
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
String checkJobId = jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
@@ -91,20 +91,20 @@ class ConsistencyCheckJobAPITest {
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")),
0, JobStatus.FINISHED, null);
jobItemManager.persistProgress(checkJobItemContext);
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
-
repositoryAPI.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
- Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence++));
}
expectedSequence = 2;
for (int i = 0; i < 2; i++) {
jobAPI.dropByParentJobId(parentJobId);
- Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence--));
}
jobAPI.dropByParentJobId(parentJobId);
- Optional<String> latestCheckJobId =
repositoryAPI.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
assertFalse(latestCheckJobId.isPresent());
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 7e0cdfdab72..9c3418ef846 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -303,7 +303,7 @@ class MigrationJobAPITest {
YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new
YamlInventoryIncrementalJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
assertThat(jobItemInfos.size(), is(1));
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -320,7 +320,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
yamlJobItemProgress.setProcessedRecordsCount(100);
yamlJobItemProgress.setInventoryRecordsCount(50);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index d23437a551a..3389cb2555c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -61,9 +61,9 @@ class MigrationDataConsistencyCheckerTest {
jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
- GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
-
governanceRepositoryAPI.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
0, "");
+
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
String checkKey = "t_order";