This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 93ffce1787c Refactor GovernanceRepositoryAPI (#29101)
93ffce1787c is described below
commit 93ffce1787cf1a4bc08f3f4966c88d64029f6193
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 21 13:22:51 2023 +0800
Refactor GovernanceRepositoryAPI (#29101)
---
.../repository/GovernanceRepositoryAPI.java | 5 +++--
.../repository/GovernanceRepositoryAPIImpl.java | 13 ++++++-----
.../service/InventoryIncrementalJobManager.java | 26 ----------------------
.../migration/prepare/MigrationJobPreparer.java | 8 +++----
.../service/GovernanceRepositoryAPIImplTest.java | 17 +++++++-------
5 files changed, 22 insertions(+), 47 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index 61502c65757..2a9e0447fbb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -44,7 +45,7 @@ public interface GovernanceRepositoryAPI {
* @param jobId job id
* @param jobOffsetInfo job offset info
*/
- void persistJobOffsetInfo(String jobId, String jobOffsetInfo);
+ void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
/**
* Get job offset info.
@@ -52,7 +53,7 @@ public interface GovernanceRepositoryAPI {
* @param jobId job id
* @return job offset info
*/
- Optional<String> getJobOffsetInfo(String jobId);
+ JobOffsetInfo getJobOffsetInfo(String jobId);
/**
* Persist job item progress.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index d1da0da09ac..9071709e745 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -20,6 +20,9 @@ package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
@@ -54,14 +57,14 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public void persistJobOffsetInfo(final String jobId, final String
jobOffsetInfo) {
- repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId),
jobOffsetInfo);
+ public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
+ repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId),
YamlEngine.marshal(new
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)));
}
@Override
- public Optional<String> getJobOffsetInfo(final String jobId) {
- String text =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
- return Strings.isNullOrEmpty(text) ? Optional.empty() :
Optional.of(text);
+ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
+ String value =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
+ return new
YamlJobOffsetInfoSwapper().swapToObject(Strings.isNullOrEmpty(value) ? new
YamlJobOffsetInfo() : YamlEngine.unmarshal(value, YamlJobOffsetInfo.class));
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 54ee4ee19ec..4e754a5d453 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -24,15 +24,11 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelinePro
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -121,26 +117,4 @@ public final class InventoryIncrementalJobManager {
map.put(each, jobItemProgress.orElse(null));
}, LinkedHashMap::putAll);
}
-
- /**
- * Persist job offset info.
- *
- * @param jobId job ID
- * @param jobOffsetInfo job offset info
- */
- public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
- String value = YamlEngine.marshal(new
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
value);
- }
-
- /**
- * Get job offset info.
- *
- * @param jobId job ID
- * @return job offset progress
- */
- public JobOffsetInfo getJobOffsetInfo(final String jobId) {
- Optional<String> offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
- return new
YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ?
YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new
YamlJobOffsetInfo());
- }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 358f3405180..8c46fd1761a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,7 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -85,8 +85,6 @@ public final class MigrationJobPreparer {
private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
- private final InventoryIncrementalJobManager
inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
-
/**
* Do prepare work.
*
@@ -136,12 +134,12 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 600000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
- JobOffsetInfo offsetInfo =
inventoryIncrementalJobManager.getJobOffsetInfo(jobId);
+ JobOffsetInfo offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
- inventoryIncrementalJobManager.persistJobOffsetInfo(jobId,
new JobOffsetInfo(true));
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
new JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index dc881b3dd15..60e31f0811c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -17,13 +17,14 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -48,10 +49,10 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
class GovernanceRepositoryAPIImplTest {
@@ -150,11 +151,9 @@ class GovernanceRepositoryAPIImplTest {
@Test
void assertPersistJobOffsetInfo() {
- assertFalse(governanceRepositoryAPI.getJobOffsetInfo("1").isPresent());
- governanceRepositoryAPI.persistJobOffsetInfo("1", "testValue");
- Optional<String> actual =
governanceRepositoryAPI.getJobOffsetInfo("1");
- assertTrue(actual.isPresent());
- assertThat(actual.get(), is("testValue"));
+
assertFalse(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
+ governanceRepositoryAPI.persistJobOffsetInfo("1", new
JobOffsetInfo(true));
+
assertTrue(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
}
@Test