This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d81dbb74399 Use PersistRepository.update to update pipeline job item 
progress (#26794)
d81dbb74399 is described below

commit d81dbb74399b45176e829bcf20a59d361630064d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jul 6 20:33:41 2023 +0800

    Use PersistRepository.update to update pipeline job item progress (#26794)
    
    * Add GovernanceRepositoryAPI.updateJobItemProgress
    
    * Add PipelineJobAPI.updateJobItemProgress
    
    * Use updateJobItemProgress to replace persistJobItemProgress in 
updateJobItemStatus
    
    * Use updateJobItemProgress to replace persistJobItemProgress in 
PipelineJobProgressPersistService
    
    * Simplify AbstractInventoryIncrementalJobAPIImpl.showProcessConfiguration 
impl
    
    * Remove AbstractInventoryIncrementalJobAPIImpl.getTargetDatabaseType
---
 .../repository/GovernanceRepositoryAPI.java        |  9 +++++++++
 .../repository/GovernanceRepositoryAPIImpl.java    |  5 +++++
 .../persist/PipelineJobProgressPersistService.java |  2 +-
 .../pipeline/core/job/service/PipelineJobAPI.java  |  7 +++++++
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 23 +++++++++++++---------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  5 -----
 .../api/impl/ConsistencyCheckJobAPI.java           | 17 ++++++++++++----
 .../migration/api/impl/MigrationJobAPI.java        |  5 -----
 .../service/GovernanceRepositoryAPIImplTest.java   | 12 ++++++++---
 9 files changed, 58 insertions(+), 27 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index c282ac15ad8..813b169c9f9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -64,6 +64,15 @@ public interface GovernanceRepositoryAPI {
      */
     void persistJobItemProgress(String jobId, int shardingItem, String 
progressValue);
     
+    /**
+     * Update job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param progressValue progress value
+     */
+    void updateJobItemProgress(String jobId, int shardingItem, String 
progressValue);
+    
     /**
      * Get job item progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 8a73ddde18a..5250a7d8dd4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -70,6 +70,11 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
     }
     
+    @Override
+    public void updateJobItemProgress(final String jobId, final int 
shardingItem, final String progressValue) {
+        repository.update(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
+    }
+    
     @Override
     public Optional<String> getJobItemProgress(final String jobId, final int 
shardingItem) {
         String text = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 113e7f62a25..d2f2570d816 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -111,7 +111,7 @@ public final class PipelineJobProgressPersistService {
             }
             persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
-            TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getTypeName()).persistJobItemProgress(jobItemContext.get());
+            TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getTypeName()).updateJobItemProgress(jobItemContext.get());
             persistContext.getBeforePersistingProgressMillis().set(null);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 4db8b156048..d26aad6bb46 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -127,6 +127,13 @@ public interface PipelineJobAPI extends TypedSPI {
      */
     void persistJobItemProgress(PipelineJobItemContext jobItemContext);
     
+    /**
+     * Update job item progress.
+     *
+     * @param jobItemContext job item context
+     */
+    void updateJobItemProgress(PipelineJobItemContext jobItemContext);
+    
     /**
      * Get job item progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 54bd0d64cfd..3bffe016146 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -82,8 +82,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
     
     private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new 
YamlJobOffsetInfoSwapper();
     
-    protected abstract String getTargetDatabaseType(PipelineJobConfiguration 
pipelineJobConfig);
-    
     @Override
     public abstract InventoryIncrementalProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
     
@@ -95,9 +93,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
     
     @Override
     public PipelineProcessConfiguration showProcessConfiguration(final 
PipelineContextKey contextKey) {
-        PipelineProcessConfiguration result = 
processConfigPersistService.load(contextKey, getJobType());
-        result = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
-        return result;
+        return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 getJobType()));
     }
     
     @Override
@@ -143,6 +139,11 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+    }
+    
+    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
         InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
         jobItemProgress.setStatus(context.getStatus());
@@ -152,9 +153,13 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
         
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
         
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
-        String value = 
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
-        String jobId = context.getJobId();
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
 context.getShardingItem(), value);
+        return 
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
+    }
+    
+    @Override
+    public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
     }
     
     private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<PipelineTask> incrementalTasks) {
@@ -198,7 +203,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
             return;
         }
         jobItemProgress.get().setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
 shardingItem,
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
                 
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bb9c398c4fd..5cf4dc72014 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -377,11 +377,6 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         throw new UnsupportedOperationException();
     }
     
-    @Override
-    protected String getTargetDatabaseType(final PipelineJobConfiguration 
pipelineJobConfig) {
-        throw new UnsupportedOperationException();
-    }
-    
     @Override
     protected String getJobClassName() {
         return CDCJob.class.getName();
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 13ea21d393c..538285694e7 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -139,6 +139,11 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
     
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+    }
+    
+    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         ConsistencyCheckJobItemContext context = 
(ConsistencyCheckJobItemContext) jobItemContext;
         ConsistencyCheckJobItemProgressContext progressContext = 
context.getProgressContext();
         String tableNames = String.join(",", progressContext.getTableNames());
@@ -146,9 +151,13 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames, 
progressContext.getCheckedRecordsCount().get(),
                 progressContext.getRecordsCount(), 
progressContext.getCheckBeginTimeMillis(), 
progressContext.getCheckEndTimeMillis(), 
progressContext.getTableCheckPositions());
         jobItemProgress.setStatus(context.getStatus());
-        YamlConsistencyCheckJobItemProgress yamlJobProgress = 
swapper.swapToYamlConfiguration(jobItemProgress);
-        String jobId = context.getJobId();
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
 context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+        return 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
+    }
+    
+    @Override
+    public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
     }
     
     @Override
@@ -165,7 +174,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
             return;
         }
         jobItemProgress.get().setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
 shardingItem,
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
                 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 52e745047eb..e9e3f4a007a 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -262,11 +262,6 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         return new 
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
-    @Override
-    protected String getTargetDatabaseType(final PipelineJobConfiguration 
pipelineJobConfig) {
-        return ((MigrationJobConfiguration) 
pipelineJobConfig).getTargetDatabaseType();
-    }
-    
     @Override
     public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 82c51a5c72d..cef3a6aebab 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -80,12 +80,18 @@ class GovernanceRepositoryAPIImplTest {
     }
     
     @Test
-    void assertPersistJobProgress() {
+    void assertPersistJobItemProgress() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
-        
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue");
+        
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue1");
+        
assertFalse(governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem()).isPresent());
+        
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue1");
         Optional<String> actual = 
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
-        assertThat(actual.get(), is("testValue"));
+        assertThat(actual.get(), is("testValue1"));
+        
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), "testValue2");
+        actual = 
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), is("testValue2"));
     }
     
     @Test

Reply via email to