This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3acec5ec7d5 Add PipelineJobItemContext.toProgress() (#29071)
3acec5ec7d5 is described below
commit 3acec5ec7d54da88071a5b6171e2eb8f310ac4f1
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 12:59:15 2023 +0800
Add PipelineJobItemContext.toProgress() (#29071)
---
.../InventoryIncrementalJobItemContext.java | 7 +++-
.../common/context/PipelineJobItemContext.java | 8 +++++
.../progress/ConsistencyCheckJobItemProgress.java | 19 ++++++++--
.../InventoryIncrementalJobItemProgress.java | 42 +++++++++++++++++++---
.../AbstractInventoryIncrementalJobAPIImpl.java | 32 ++---------------
.../api/impl/ConsistencyCheckJobAPI.java | 12 +------
.../context/ConsistencyCheckJobItemContext.java | 7 ++++
7 files changed, 77 insertions(+), 50 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
index 8a2ca947905..ae0cd2f48c2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.common.context;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -88,4 +88,9 @@ public interface InventoryIncrementalJobItemContext extends
PipelineJobItemConte
* @return inventory records count
*/
long getInventoryRecordsCount();
+
+ @Override
+ default InventoryIncrementalJobItemProgress toProgress() {
+ return new InventoryIncrementalJobItemProgress(this);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
index ded73005aed..154aa659438 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineJobItemContext.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.common.context;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
/**
* Pipeline job item context.
@@ -85,4 +86,11 @@ public interface PipelineJobItemContext {
* @return stopping
*/
boolean isStopping();
+
+ /**
+ * Convert to pipeline job item progress.
+ *
+ * @return converted pipeline job item progress
+ */
+ PipelineJobItemProgress toProgress();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
index f43db99d70f..0a1ac665b29 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import java.util.Map;
@@ -34,9 +35,6 @@ import java.util.Map;
// TODO Refactor structure, List<TableProgress>
public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemProgress {
- @Setter
- private JobStatus status = JobStatus.RUNNING;
-
private final String tableNames;
private final String ignoredTableNames;
@@ -54,4 +52,19 @@ public final class ConsistencyCheckJobItemProgress
implements PipelineJobItemPro
private final Map<String, Object> targetTableCheckPositions;
private final String sourceDatabaseType;
+
+ @Setter
+ private JobStatus status = JobStatus.RUNNING;
+
+ public ConsistencyCheckJobItemProgress(final
ConsistencyCheckJobItemProgressContext context) {
+ tableNames = String.join(",", context.getTableNames());
+ ignoredTableNames = String.join(",", context.getIgnoredTableNames());
+ checkedRecordsCount = context.getCheckedRecordsCount().get();
+ recordsCount = context.getRecordsCount();
+ checkBeginTimeMillis = context.getCheckBeginTimeMillis();
+ checkEndTimeMillis = context.getCheckEndTimeMillis();
+ sourceTableCheckPositions = context.getSourceTableCheckPositions();
+ targetTableCheckPositions = context.getTargetTableCheckPositions();
+ sourceDatabaseType = context.getSourceDatabaseType();
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
index 043b00b9f43..d0ffdb41df4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
@@ -18,30 +18,62 @@
package org.apache.shardingsphere.data.pipeline.common.job.progress;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
+import
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Inventory incremental job item progress.
*/
+@NoArgsConstructor
@Getter
@Setter
public final class InventoryIncrementalJobItemProgress implements
PipelineJobItemProgress {
- private JobStatus status = JobStatus.RUNNING;
-
private DatabaseType sourceDatabaseType;
private String dataSourceName;
- private boolean active;
-
private JobItemInventoryTasksProgress inventory;
private JobItemIncrementalTasksProgress incremental;
+ private long inventoryRecordsCount;
+
private long processedRecordsCount;
- private long inventoryRecordsCount;
+ private boolean active;
+
+ private JobStatus status = JobStatus.RUNNING;
+
+ public InventoryIncrementalJobItemProgress(final
InventoryIncrementalJobItemContext context) {
+ sourceDatabaseType = context.getJobConfig().getSourceDatabaseType();
+ dataSourceName = context.getDataSourceName();
+ inventory = getInventoryTasksProgress(context.getInventoryTasks());
+ incremental =
getIncrementalTasksProgress(context.getIncrementalTasks());
+ inventoryRecordsCount = context.getInventoryRecordsCount();
+ processedRecordsCount = context.getProcessedRecordsCount();
+ status = context.getStatus();
+ }
+
+ private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<PipelineTask> incrementalTasks) {
+ return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty()
? null : (IncrementalTaskProgress)
incrementalTasks.iterator().next().getTaskProgress());
+ }
+
+ private JobItemInventoryTasksProgress getInventoryTasksProgress(final
Collection<PipelineTask> inventoryTasks) {
+ Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new
HashMap<>();
+ for (PipelineTask each : inventoryTasks) {
+ inventoryTaskProgressMap.put(each.getTaskId(),
(InventoryTaskProgress) each.getTaskProgress());
+ }
+ return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 7462f12669e..372971129cc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -23,13 +23,10 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
-import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemInventoryTasksProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
@@ -38,8 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobO
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -47,7 +42,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
@@ -55,7 +49,6 @@ import
org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -134,35 +127,14 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}
- private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
- InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
- jobItemProgress.setStatus(context.getStatus());
-
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
- jobItemProgress.setDataSourceName(context.getDataSourceName());
-
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
-
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
-
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
-
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
- return
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
- }
-
@Override
public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}
- private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<PipelineTask> incrementalTasks) {
- return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty()
? null : (IncrementalTaskProgress)
incrementalTasks.iterator().next().getTaskProgress());
- }
-
- private JobItemInventoryTasksProgress getInventoryTasksProgress(final
Collection<PipelineTask> inventoryTasks) {
- Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new
HashMap<>();
- for (PipelineTask each : inventoryTasks) {
- inventoryTaskProgressMap.put(each.getTaskId(),
(InventoryTaskProgress) each.getTaskProgress());
- }
- return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+ private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+ return
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress)
jobItemContext.toProgress()));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index e96f1b2e4f3..da0f06e6ed9 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlCons
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -44,7 +43,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.poj
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -129,15 +127,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- ConsistencyCheckJobItemContext context =
(ConsistencyCheckJobItemContext) jobItemContext;
- ConsistencyCheckJobItemProgressContext progressContext =
context.getProgressContext();
- String tableNames = String.join(",", progressContext.getTableNames());
- String ignoredTableNames = String.join(",",
progressContext.getIgnoredTableNames());
- ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames,
progressContext.getCheckedRecordsCount().get(),
- progressContext.getRecordsCount(),
progressContext.getCheckBeginTimeMillis(),
progressContext.getCheckEndTimeMillis(),
- progressContext.getSourceTableCheckPositions(),
progressContext.getTargetTableCheckPositions(),
progressContext.getSourceDatabaseType());
- jobItemProgress.setStatus(context.getStatus());
- return
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
+ return
YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress)
jobItemContext.toProgress()));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 5dd7865398b..30bed68443f 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -69,4 +69,11 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
public PipelineProcessContext getJobProcessContext() {
return processContext;
}
+
+ @Override
+ public ConsistencyCheckJobItemProgress toProgress() {
+ ConsistencyCheckJobItemProgress result = new
ConsistencyCheckJobItemProgress(progressContext);
+ result.setStatus(status);
+ return result;
+ }
}