This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d81dbb74399 Use PersistRepository.update to update pipeline job item
progress (#26794)
d81dbb74399 is described below
commit d81dbb74399b45176e829bcf20a59d361630064d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jul 6 20:33:41 2023 +0800
Use PersistRepository.update to update pipeline job item progress (#26794)
* Add GovernanceRepositoryAPI.updateJobItemProgress
* Add PipelineJobAPI.updateJobItemProgress
* Use updateJobItemProgress to replace persistJobItemProgress in
updateJobItemStatus
* Use updateJobItemProgress to replace persistJobItemProgress in
PipelineJobProgressPersistService
* Simplify AbstractInventoryIncrementalJobAPIImpl.showProcessConfiguration
impl
* Remove AbstractInventoryIncrementalJobAPIImpl.getTargetDatabaseType
---
.../repository/GovernanceRepositoryAPI.java | 9 +++++++++
.../repository/GovernanceRepositoryAPIImpl.java | 5 +++++
.../persist/PipelineJobProgressPersistService.java | 2 +-
.../pipeline/core/job/service/PipelineJobAPI.java | 7 +++++++
.../AbstractInventoryIncrementalJobAPIImpl.java | 23 +++++++++++++---------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 -----
.../api/impl/ConsistencyCheckJobAPI.java | 17 ++++++++++++----
.../migration/api/impl/MigrationJobAPI.java | 5 -----
.../service/GovernanceRepositoryAPIImplTest.java | 12 ++++++++---
9 files changed, 58 insertions(+), 27 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index c282ac15ad8..813b169c9f9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -64,6 +64,15 @@ public interface GovernanceRepositoryAPI {
*/
void persistJobItemProgress(String jobId, int shardingItem, String
progressValue);
+ /**
+ * Update job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param progressValue progress value
+ */
+ void updateJobItemProgress(String jobId, int shardingItem, String
progressValue);
+
/**
* Get job item progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 8a73ddde18a..5250a7d8dd4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -70,6 +70,11 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
}
+ @Override
+ public void updateJobItemProgress(final String jobId, final int
shardingItem, final String progressValue) {
+ repository.update(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
+ }
+
@Override
public Optional<String> getJobItemProgress(final String jobId, final int
shardingItem) {
String text =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 113e7f62a25..d2f2570d816 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -111,7 +111,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getTypeName()).persistJobItemProgress(jobItemContext.get());
+ TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getTypeName()).updateJobItemProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 4db8b156048..d26aad6bb46 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -127,6 +127,13 @@ public interface PipelineJobAPI extends TypedSPI {
*/
void persistJobItemProgress(PipelineJobItemContext jobItemContext);
+ /**
+ * Update job item progress.
+ *
+ * @param jobItemContext job item context
+ */
+ void updateJobItemProgress(PipelineJobItemContext jobItemContext);
+
/**
* Get job item progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 54bd0d64cfd..3bffe016146 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -82,8 +82,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new
YamlJobOffsetInfoSwapper();
- protected abstract String getTargetDatabaseType(PipelineJobConfiguration
pipelineJobConfig);
-
@Override
public abstract InventoryIncrementalProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
@@ -95,9 +93,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
@Override
public PipelineProcessConfiguration showProcessConfiguration(final
PipelineContextKey contextKey) {
- PipelineProcessConfiguration result =
processConfigPersistService.load(contextKey, getJobType());
- result =
PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
- return result;
+ return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
getJobType()));
}
@Override
@@ -143,6 +139,11 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+ }
+
+ private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(context.getStatus());
@@ -152,9 +153,13 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
- String value =
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
- String jobId = context.getJobId();
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
context.getShardingItem(), value);
+ return
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
+ }
+
+ @Override
+ public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}
private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<PipelineTask> incrementalTasks) {
@@ -198,7 +203,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
return;
}
jobItemProgress.get().setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
shardingItem,
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bb9c398c4fd..5cf4dc72014 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -377,11 +377,6 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
throw new UnsupportedOperationException();
}
- @Override
- protected String getTargetDatabaseType(final PipelineJobConfiguration
pipelineJobConfig) {
- throw new UnsupportedOperationException();
- }
-
@Override
protected String getJobClassName() {
return CDCJob.class.getName();
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 13ea21d393c..538285694e7 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -139,6 +139,11 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
@Override
public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+ }
+
+ private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
ConsistencyCheckJobItemContext context =
(ConsistencyCheckJobItemContext) jobItemContext;
ConsistencyCheckJobItemProgressContext progressContext =
context.getProgressContext();
String tableNames = String.join(",", progressContext.getTableNames());
@@ -146,9 +151,13 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames,
progressContext.getCheckedRecordsCount().get(),
progressContext.getRecordsCount(),
progressContext.getCheckBeginTimeMillis(),
progressContext.getCheckEndTimeMillis(),
progressContext.getTableCheckPositions());
jobItemProgress.setStatus(context.getStatus());
- YamlConsistencyCheckJobItemProgress yamlJobProgress =
swapper.swapToYamlConfiguration(jobItemProgress);
- String jobId = context.getJobId();
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+ return
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
+ }
+
+ @Override
+ public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}
@Override
@@ -165,7 +174,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
return;
}
jobItemProgress.get().setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
shardingItem,
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 52e745047eb..e9e3f4a007a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -262,11 +262,6 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
return new
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
- @Override
- protected String getTargetDatabaseType(final PipelineJobConfiguration
pipelineJobConfig) {
- return ((MigrationJobConfiguration)
pipelineJobConfig).getTargetDatabaseType();
- }
-
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 82c51a5c72d..cef3a6aebab 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -80,12 +80,18 @@ class GovernanceRepositoryAPIImplTest {
}
@Test
- void assertPersistJobProgress() {
+ void assertPersistJobItemProgress() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
-
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue");
+
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
+
assertFalse(governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent());
+
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue1");
Optional<String> actual =
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
- assertThat(actual.get(), is("testValue"));
+ assertThat(actual.get(), is("testValue1"));
+
governanceRepositoryAPI.updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), "testValue2");
+ actual =
governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("testValue2"));
}
@Test