This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ffce1787c Refactor GovernanceRepositoryAPI (#29101)
93ffce1787c is described below

commit 93ffce1787cf1a4bc08f3f4966c88d64029f6193
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 21 13:22:51 2023 +0800

    Refactor GovernanceRepositoryAPI (#29101)
---
 .../repository/GovernanceRepositoryAPI.java        |  5 +++--
 .../repository/GovernanceRepositoryAPIImpl.java    | 13 ++++++-----
 .../service/InventoryIncrementalJobManager.java    | 26 ----------------------
 .../migration/prepare/MigrationJobPreparer.java    |  8 +++----
 .../service/GovernanceRepositoryAPIImplTest.java   | 17 +++++++-------
 5 files changed, 22 insertions(+), 47 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index 61502c65757..2a9e0447fbb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
 
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
@@ -44,7 +45,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobId job id
      * @param jobOffsetInfo job offset info
      */
-    void persistJobOffsetInfo(String jobId, String jobOffsetInfo);
+    void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
     
     /**
      * Get job offset info.
@@ -52,7 +53,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobId job id
      * @return job offset info
      */
-    Optional<String> getJobOffsetInfo(String jobId);
+    JobOffsetInfo getJobOffsetInfo(String jobId);
     
     /**
      * Persist job item progress.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index d1da0da09ac..9071709e745 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -20,6 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
@@ -54,14 +57,14 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     }
     
     @Override
-    public void persistJobOffsetInfo(final String jobId, final String 
jobOffsetInfo) {
-        repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId), 
jobOffsetInfo);
+    public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
+        repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId), 
YamlEngine.marshal(new 
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)));
     }
     
     @Override
-    public Optional<String> getJobOffsetInfo(final String jobId) {
-        String text = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
-        return Strings.isNullOrEmpty(text) ? Optional.empty() : 
Optional.of(text);
+    public JobOffsetInfo getJobOffsetInfo(final String jobId) {
+        String value = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
+        return new 
YamlJobOffsetInfoSwapper().swapToObject(Strings.isNullOrEmpty(value) ? new 
YamlJobOffsetInfo() : YamlEngine.unmarshal(value, YamlJobOffsetInfo.class));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 54ee4ee19ec..4e754a5d453 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -24,15 +24,11 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelinePro
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -121,26 +117,4 @@ public final class InventoryIncrementalJobManager {
             map.put(each, jobItemProgress.orElse(null));
         }, LinkedHashMap::putAll);
     }
-    
-    /**
-     * Persist job offset info.
-     *
-     * @param jobId job ID
-     * @param jobOffsetInfo job offset info
-     */
-    public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
-        String value = YamlEngine.marshal(new 
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
 value);
-    }
-    
-    /**
-     * Get job offset info.
-     *
-     * @param jobId job ID
-     * @return job offset progress
-     */
-    public JobOffsetInfo getJobOffsetInfo(final String jobId) {
-        Optional<String> offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
-        return new 
YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? 
YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new 
YamlJobOffsetInfo());
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 358f3405180..8c46fd1761a 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,7 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -85,8 +85,6 @@ public final class MigrationJobPreparer {
     
     private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
-    private final InventoryIncrementalJobManager 
inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
-    
     /**
      * Do prepare work.
      *
@@ -136,12 +134,12 @@ public final class MigrationJobPreparer {
         if (lockContext.tryLock(lockDefinition, 600000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
-                JobOffsetInfo offsetInfo = 
inventoryIncrementalJobManager.getJobOffsetInfo(jobId);
+                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
-                    inventoryIncrementalJobManager.persistJobOffsetInfo(jobId, 
new JobOffsetInfo(true));
+                    
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
 new JobOffsetInfo(true));
                 }
             } finally {
                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index dc881b3dd15..60e31f0811c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -17,13 +17,14 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -48,10 +49,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 
 class GovernanceRepositoryAPIImplTest {
@@ -150,11 +151,9 @@ class GovernanceRepositoryAPIImplTest {
     
     @Test
     void assertPersistJobOffsetInfo() {
-        assertFalse(governanceRepositoryAPI.getJobOffsetInfo("1").isPresent());
-        governanceRepositoryAPI.persistJobOffsetInfo("1", "testValue");
-        Optional<String> actual = 
governanceRepositoryAPI.getJobOffsetInfo("1");
-        assertTrue(actual.isPresent());
-        assertThat(actual.get(), is("testValue"));
+        
assertFalse(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
+        governanceRepositoryAPI.persistJobOffsetInfo("1", new 
JobOffsetInfo(true));
+        
assertTrue(governanceRepositoryAPI.getJobOffsetInfo("1").isTargetSchemaTableCreated());
     }
     
     @Test

Reply via email to