This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 56ebfb13634 Refactor PipelineGovernanceFacade (#29142)
56ebfb13634 is described below
commit 56ebfb136342f05230fce2c2f28fac9ddc946e00
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 19:46:37 2023 +0800
Refactor PipelineGovernanceFacade (#29142)
---
.../repository/PipelineGovernanceFacade.java | 30 +++++-----------
...ineJobItemErrorMessageGovernanceRepository.java | 2 +-
.../PipelineJobItemFacade.java} | 34 ++++++------------
...PipelineJobItemProcessGovernanceRepository.java | 2 +-
.../PipelineJobCheckGovernanceRepository.java | 2 +-
...pelineJobConfigurationGovernanceRepository.java | 2 +-
.../PipelineJobFacade.java} | 40 +++++++++-------------
.../{ => job}/PipelineJobGovernanceRepository.java | 2 +-
.../PipelineJobOffsetGovernanceRepository.java | 2 +-
...lineMetaDataDataSourceGovernanceRepository.java | 2 +-
.../PipelineMetaDataFacade.java} | 34 ++++++------------
...taProcessConfigurationGovernanceRepository.java | 2 +-
.../service/PipelineJobIteErrorMessageManager.java | 6 ++--
.../core/job/service/PipelineJobItemManager.java | 8 ++---
.../core/job/service/PipelineJobManager.java | 12 +++----
.../metadata/PipelineDataSourcePersistService.java | 4 +--
...PipelineProcessConfigurationPersistService.java | 4 +--
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 8 ++---
.../api/impl/ConsistencyCheckJobAPI.java | 24 ++++++-------
.../task/ConsistencyCheckTasksRunner.java | 2 +-
.../migration/api/impl/MigrationJobAPI.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 4 +--
.../job/service/PipelineGovernanceFacadeTest.java | 38 ++++++++++----------
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 8 ++---
.../migration/api/impl/MigrationJobAPITest.java | 4 +--
.../MigrationDataConsistencyCheckerTest.java | 2 +-
27 files changed, 117 insertions(+), 165 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
index a2b33c35d69..a96baaf20a2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java
@@ -20,6 +20,9 @@ package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
import lombok.AccessLevel;
import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemFacade;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job.PipelineJobFacade;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata.PipelineMetaDataFacade;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -32,32 +35,17 @@ public final class PipelineGovernanceFacade {
@Getter(AccessLevel.NONE)
private final ClusterPersistRepository repository;
- private final PipelineJobConfigurationGovernanceRepository
jobConfigurationGovernanceRepository;
+ private final PipelineJobFacade jobFacade;
- private final PipelineJobOffsetGovernanceRepository
jobOffsetGovernanceRepository;
+ private final PipelineJobItemFacade jobItemFacade;
- private final PipelineJobItemProcessGovernanceRepository
jobItemProcessGovernanceRepository;
-
- private final PipelineJobItemErrorMessageGovernanceRepository
jobItemErrorMessageGovernanceRepository;
-
- private final PipelineJobCheckGovernanceRepository
jobCheckGovernanceRepository;
-
- private final PipelineJobGovernanceRepository jobGovernanceRepository;
-
- private final PipelineMetaDataDataSourceGovernanceRepository
metaDataDataSourceGovernanceRepository;
-
- private final PipelineMetaDataProcessConfigurationGovernanceRepository
metaDataProcessConfigurationGovernanceRepository;
+ private final PipelineMetaDataFacade metaDataFacade;
public PipelineGovernanceFacade(final ClusterPersistRepository repository)
{
this.repository = repository;
- jobConfigurationGovernanceRepository = new
PipelineJobConfigurationGovernanceRepository(repository);
- jobOffsetGovernanceRepository = new
PipelineJobOffsetGovernanceRepository(repository);
- jobItemProcessGovernanceRepository = new
PipelineJobItemProcessGovernanceRepository(repository);
- jobItemErrorMessageGovernanceRepository = new
PipelineJobItemErrorMessageGovernanceRepository(repository);
- jobCheckGovernanceRepository = new
PipelineJobCheckGovernanceRepository(repository);
- jobGovernanceRepository = new
PipelineJobGovernanceRepository(repository);
- metaDataDataSourceGovernanceRepository = new
PipelineMetaDataDataSourceGovernanceRepository(repository);
- metaDataProcessConfigurationGovernanceRepository = new
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
+ jobFacade = new PipelineJobFacade(repository);
+ jobItemFacade = new PipelineJobItemFacade(repository);
+ metaDataFacade = new PipelineMetaDataFacade(repository);
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
index eee1bd77b57..213302fa88f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java
similarity index 53%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java
index a403f0f7b0e..0bf38b96a22 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java
@@ -15,37 +15,23 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
- * Pipeline job governance repository.
+ * Pipeline job item facade.
*/
-@RequiredArgsConstructor
-public final class PipelineJobGovernanceRepository {
+@Getter
+public final class PipelineJobItemFacade {
- private final ClusterPersistRepository repository;
+ private final PipelineJobItemProcessGovernanceRepository process;
- /**
- * Create job.
- *
- * @param jobId job ID
- * @param jobClass job class
- */
- public void create(final String jobId, final Class<? extends PipelineJob>
jobClass) {
- repository.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobClass.getName());
- }
+ private final PipelineJobItemErrorMessageGovernanceRepository errorMessage;
- /**
- * Delete job.
- *
- * @param jobId job id
- */
- public void delete(final String jobId) {
- repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+ public PipelineJobItemFacade(final ClusterPersistRepository repository) {
+ process = new PipelineJobItemProcessGovernanceRepository(repository);
+ errorMessage = new
PipelineJobItemErrorMessageGovernanceRepository(repository);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java
similarity index 99%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java
index 7ea4887fdb8..6beb8cfc8db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java
similarity index 99%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java
index e5c44dfdd3c..15ae3ea230b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java
index 9b6d13bf983..1c4a1af066b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java
similarity index 53%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java
index a403f0f7b0e..9e101ece185 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java
@@ -15,37 +15,29 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
- * Pipeline job governance repository.
+ * Pipeline job facade.
*/
-@RequiredArgsConstructor
-public final class PipelineJobGovernanceRepository {
+@Getter
+public final class PipelineJobFacade {
- private final ClusterPersistRepository repository;
+ private final PipelineJobGovernanceRepository job;
- /**
- * Create job.
- *
- * @param jobId job ID
- * @param jobClass job class
- */
- public void create(final String jobId, final Class<? extends PipelineJob>
jobClass) {
- repository.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobClass.getName());
- }
+ private final PipelineJobConfigurationGovernanceRepository configuration;
+
+ private final PipelineJobOffsetGovernanceRepository offset;
+
+ private final PipelineJobCheckGovernanceRepository check;
- /**
- * Delete job.
- *
- * @param jobId job id
- */
- public void delete(final String jobId) {
- repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+ public PipelineJobFacade(final ClusterPersistRepository repository) {
+ job = new PipelineJobGovernanceRepository(repository);
+ configuration = new
PipelineJobConfigurationGovernanceRepository(repository);
+ offset = new PipelineJobOffsetGovernanceRepository(repository);
+ check = new PipelineJobCheckGovernanceRepository(repository);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java
similarity index 98%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java
index a403f0f7b0e..606020cf405 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java
similarity index 99%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java
index c007b67574f..c75ace6e8ca 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java
index 870cc0c20dc..70a973c8be5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java
similarity index 53%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java
index a403f0f7b0e..4d3a6a430a9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java
@@ -15,37 +15,23 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import lombok.Getter;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
- * Pipeline job governance repository.
+ * Pipeline job item facade.
*/
-@RequiredArgsConstructor
-public final class PipelineJobGovernanceRepository {
+@Getter
+public final class PipelineMetaDataFacade {
- private final ClusterPersistRepository repository;
+ private final PipelineMetaDataDataSourceGovernanceRepository dataSource;
- /**
- * Create job.
- *
- * @param jobId job ID
- * @param jobClass job class
- */
- public void create(final String jobId, final Class<? extends PipelineJob>
jobClass) {
- repository.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobClass.getName());
- }
+ private final PipelineMetaDataProcessConfigurationGovernanceRepository
processConfiguration;
- /**
- * Delete job.
- *
- * @param jobId job id
- */
- public void delete(final String jobId) {
- repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
+ public PipelineMetaDataFacade(final ClusterPersistRepository repository) {
+ dataSource = new
PipelineMetaDataDataSourceGovernanceRepository(repository);
+ processConfiguration = new
PipelineMetaDataProcessConfigurationGovernanceRepository(repository);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java
index e6f3e1ffb58..6f2802e735b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index b63e9b5b269..1066c68d9f3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -46,7 +46,7 @@ public final class PipelineJobIteErrorMessageManager {
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage() {
- return
Optional.ofNullable(governanceFacade.getJobItemErrorMessageGovernanceRepository().load(jobId,
shardingItem)).orElse("");
+ return
Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId,
shardingItem)).orElse("");
}
/**
@@ -55,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
* @param error error
*/
public void updateErrorMessage(final Object error) {
-
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, null == error ? "" : buildErrorMessage(error));
+ governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, null == error ? "" : buildErrorMessage(error));
}
private String buildErrorMessage(final Object error) {
@@ -66,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
* Clean job item error message.
*/
public void cleanErrorMessage() {
-
governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId,
shardingItem, "");
+ governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, "");
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index d2463bc7f5f..c9643d284b1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -55,7 +55,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId))
- .getJobItemProcessGovernanceRepository().update(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+ .getJobItemFacade().getProcess().update(jobId, shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
/**
@@ -66,7 +66,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
* @return job item progress
*/
public Optional<T> getProgress(final String jobId, final int shardingItem)
{
- return
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId,
shardingItem)
+ return
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().load(jobId,
shardingItem)
.map(optional ->
swapper.swapToObject(YamlEngine.unmarshal(optional,
swapper.getYamlProgressClass(), true)));
}
@@ -77,7 +77,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
*/
public void persistProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-
.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+
.getJobItemFacade().getProcess().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}
/**
@@ -87,7 +87,7 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
*/
public void updateProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-
.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+
.getJobItemFacade().getProcess().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}
@SuppressWarnings("unchecked")
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 59d205b8528..1a11a902d52 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -75,12 +75,12 @@ public final class PipelineJobManager {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobId));
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
- if
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobId)) {
+ if
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
log.warn("jobId already exists in registry center, ignore, job id
is `{}`", jobId);
return Optional.of(jobId);
}
- governanceFacade.getJobGovernanceRepository().create(jobId,
jobAPI.getJobClass());
-
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobId,
jobConfig.convertToJobConfigurationPOJO());
+ governanceFacade.getJobFacade().getJob().create(jobId,
jobAPI.getJobClass());
+ governanceFacade.getJobFacade().getConfiguration().persist(jobId,
jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}
@@ -119,7 +119,7 @@ public final class PipelineJobManager {
}
private void startNextDisabledJob(final String jobId, final String
toBeStartDisabledNextJobType) {
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStartDisabledNextJobType)).startDisabledJob(optional);
// CHECKSTYLE:OFF
@@ -141,7 +141,7 @@ public final class PipelineJobManager {
}
private void stopPreviousJob(final String jobId, final String
toBeStoppedPreviousJobType) {
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional
-> {
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStoppedPreviousJobType)).stop(optional);
// CHECKSTYLE:OFF
@@ -176,7 +176,7 @@ public final class PipelineJobManager {
public void drop(final String jobId) {
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId),
null);
-
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobGovernanceRepository().delete(jobId);
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index a951f9fee3c..924ae8859a9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
@Override
@SuppressWarnings("unchecked")
public Map<String, DataSourcePoolProperties> load(final PipelineContextKey
contextKey, final String jobType) {
- String dataSourcesProps =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType);
+ String dataSourcesProps =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().load(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
}
@@ -55,6 +55,6 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
dataSourceMap.put(entry.getKey(),
swapper.swapToMap(entry.getValue()));
}
-
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType,
YamlEngine.marshal(dataSourceMap));
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().persist(jobType,
YamlEngine.marshal(dataSourceMap));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index 5b8c1bce798..97954a5cc96 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
@Override
public PipelineProcessConfiguration load(final PipelineContextKey
contextKey, final String jobType) {
- String yamlText =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType);
+ String yamlText =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getProcessConfiguration().load(jobType);
if (Strings.isNullOrEmpty(yamlText)) {
return null;
}
@@ -45,6 +45,6 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
@Override
public void persist(final PipelineContextKey contextKey, final String
jobType, final PipelineProcessConfiguration processConfig) {
String yamlText =
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
-
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType,
yamlText);
+
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getProcessConfiguration().persist(jobType,
yamlText);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 0b4a06dd368..614d06c9f40 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -121,13 +121,13 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- if
(governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId()))
{
+ if
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
{
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
-
governanceFacade.getJobGovernanceRepository().create(jobConfig.getJobId(),
getJobClass());
+
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
getJobClass());
JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
-
governanceFacade.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(),
jobConfigPOJO);
+
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
@@ -177,7 +177,7 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
}
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist(
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
jobId, i,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index d9a48111722..a9a846bed49 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -80,7 +80,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
public String createJobAndStart(final CreateConsistencyCheckJobParameter
param) {
String parentJobId = param.getParentJobId();
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
PipelineJobItemManager<ConsistencyCheckJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> progress =
jobItemManager.getProgress(latestCheckJobId.get(), 0);
@@ -92,8 +92,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
verifyPipelineDatabaseType(param);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
String result = latestCheckJobId.map(s -> new
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
-
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
result);
-
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
result);
+
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId,
result);
+
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId,
result);
new PipelineJobManager(this).drop(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new
YamlConsistencyCheckJobConfiguration();
yamlConfig.setJobId(result);
@@ -126,7 +126,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private String getLatestCheckJobId(final String parentJobId) {
- Optional<String> result =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> result =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(result.isPresent(), () -> new
ConsistencyCheckJobNotFoundException(parentJobId));
return result.get();
}
@@ -150,16 +150,16 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
new PipelineJobManager(this).stop(latestCheckJobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
- Collection<String> checkJobIds =
governanceFacade.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId);
+ Collection<String> checkJobIds =
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
Optional<Integer> previousSequence =
ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
String checkJobId = new ConsistencyCheckJobId(contextKey,
parentJobId, previousSequence.get()).marshal();
-
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
checkJobId);
+
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId,
checkJobId);
} else {
-
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
+
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
}
-
governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId,
latestCheckJobId);
+
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId,
latestCheckJobId);
new PipelineJobManager(this).drop(latestCheckJobId);
}
@@ -171,7 +171,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
*/
public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String
parentJobId) {
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -182,7 +182,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
- Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
latestCheckJobId.get());
+ Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId,
latestCheckJobId.get());
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","),
checkJobResult));
}
if (Objects.equals(jobItemProgress.getIgnoredTableNames(),
jobItemProgress.getTableNames())) {
@@ -212,7 +212,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
private ConsistencyCheckJobItemInfo getJobItemInfo(final String
parentJobId) {
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
@@ -233,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
- Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId,
checkJobId);
+ Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId,
checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null :
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 0036cb46113..f705d6b01ab 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -114,7 +114,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
parentJobId, checkJobConfig.getAlgorithmTypeName(),
checkResultMap, jobItemContext.isStopping());
if (!jobItemContext.isStopping()) {
PipelineAPIFactory.getPipelineGovernanceFacade(
-
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
+
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
}
} finally {
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 66d16150c09..e7e4f0bf06e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -306,7 +306,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
}
private void dropCheckJobs(final String jobId) {
- Collection<String> checkJobIds =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId);
+ Collection<String> checkJobIds =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
if (checkJobIds.isEmpty()) {
return;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 993009506e7..26918b29f9b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -134,12 +134,12 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 600000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
- JobOffsetInfo offsetInfo =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId);
+ JobOffsetInfo offsetInfo =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId,
new JobOffsetInfo(true));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId,
new JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
index 75c9516d8fd..424f48d82c9 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.Placeholde
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
@@ -96,7 +96,7 @@ class PipelineGovernanceFacadeTest {
void assertDeleteJob() {
ClusterPersistRepository clusterPersistRepository =
getClusterPersistRepository();
clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT +
"/1", "");
- governanceFacade.getJobGovernanceRepository().delete("1");
+ governanceFacade.getJobFacade().getJob().delete("1");
Optional<String> actual = new
PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1",
0);
assertFalse(actual.isPresent());
}
@@ -104,21 +104,21 @@ class PipelineGovernanceFacadeTest {
@Test
void assertIsExistedJobConfiguration() {
ClusterPersistRepository clusterPersistRepository =
getClusterPersistRepository();
-
assertFalse(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+
assertFalse(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job"));
clusterPersistRepository.persist("/pipeline/jobs/foo_job/config",
"foo");
-
assertTrue(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job"));
+
assertTrue(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job"));
}
@Test
void assertLatestCheckJobIdPersistenceDeletion() {
String parentJobId = "testParentJob";
String expectedCheckJobId = "testCheckJob";
-
governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId,
expectedCheckJobId);
- Optional<String> actualCheckJobIdOpt =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId,
expectedCheckJobId);
+ Optional<String> actualCheckJobIdOpt =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
assertTrue(actualCheckJobIdOpt.isPresent());
assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId));
-
governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId);
-
assertFalse(governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent());
+
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
+
assertFalse(governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId).isPresent());
}
@Test
@@ -126,8 +126,8 @@ class PipelineGovernanceFacadeTest {
MigrationJobItemContext jobItemContext = mockJobItemContext();
Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
actual.put("test", new TableDataConsistencyCheckResult(true));
-
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
- Map<String, TableDataConsistencyCheckResult> checkResult =
governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(),
"j02123");
+
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
governanceFacade.getJobFacade().getCheck().getCheckJobResult(jobItemContext.getJobId(),
"j02123");
assertThat(checkResult.size(), is(1));
assertTrue(checkResult.get("test").isMatched());
}
@@ -135,23 +135,23 @@ class PipelineGovernanceFacadeTest {
@Test
void assertPersistJobItemProcess() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
-
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
-
assertFalse(governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
-
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
- Optional<String> actual =
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+
governanceFacade.getJobItemFacade().getProcess().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+
assertFalse(governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
+
governanceFacade.getJobItemFacade().getProcess().persist(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+ Optional<String> actual =
governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get(), is("testValue1"));
-
governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
- actual =
governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+
governanceFacade.getJobItemFacade().getProcess().update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
+ actual =
governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get(), is("testValue2"));
}
@Test
void assertPersistJobOffset() {
-
assertFalse(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
- governanceFacade.getJobOffsetGovernanceRepository().persist("1", new
JobOffsetInfo(true));
-
assertTrue(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated());
+
assertFalse(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated());
+ governanceFacade.getJobFacade().getOffset().persist("1", new
JobOffsetInfo(true));
+
assertTrue(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated());
}
private ClusterPersistRepository getClusterPersistRepository() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 437293bca13..3802e92bcd3 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -51,7 +51,7 @@ class ConsistencyCheckJobTest {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = pipelineJobId.marshal();
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId,
0,
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildPipelineJobItemContext(
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 309b944d451..b0ff58bbfaf 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -91,20 +91,20 @@ class ConsistencyCheckJobAPITest {
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")),
0, JobStatus.FINISHED, null);
jobItemManager.persistProgress(checkJobItemContext);
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
-
governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
- Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence++));
}
expectedSequence = 2;
for (int i = 0; i < 2; i++) {
jobAPI.dropByParentJobId(parentJobId);
- Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence--));
}
jobAPI.dropByParentJobId(parentJobId);
- Optional<String> latestCheckJobId =
governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId);
+ Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
assertFalse(latestCheckJobId.isPresent());
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 9c3418ef846..be69e9dd0eb 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -303,7 +303,7 @@ class MigrationJobAPITest {
YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new
YamlInventoryIncrementalJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
assertThat(jobItemInfos.size(), is(1));
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -320,7 +320,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
yamlJobItemProgress.setProcessedRecordsCount(100);
yamlJobItemProgress.setInventoryRecordsCount(50);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3389cb2555c..64d9d193213 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -63,7 +63,7 @@ class MigrationDataConsistencyCheckerTest {
jobConfigurationPOJO.setShardingTotalCount(1);
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
-
governanceFacade.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(),
0, "");
+
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
String checkKey = "t_order";