This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 750aa707f3d Refactor AbstractInseparablePipelineJob (#32746)
750aa707f3d is described below
commit 750aa707f3d342f4ba96ec7aa2cfc0c85d53b39d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 15:24:55 2024 +0800
Refactor AbstractInseparablePipelineJob (#32746)
---
.../core/job/AbstractInseparablePipelineJob.java | 26 +++++++++++++---------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 10 +++------
2 files changed, 18 insertions(+), 18 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 48aec69d15e..d1949182d9b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
@@ -45,11 +46,12 @@ import java.util.concurrent.CompletableFuture;
*
* @param <T> type of pipeline job configuration
* @param <I> type of pipeline job item context
+ * @param <P> type of pipeline job item progress
*/
@RequiredArgsConstructor
@Getter
@Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext> implements
PipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
private final PipelineJobRunnerManager jobRunnerManager;
@@ -65,9 +67,11 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Collection<I> jobItemContexts = new LinkedList<>();
+ PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
- I jobItemContext = buildJobItemContext(jobConfig, shardingItem);
+ P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
+ I jobItemContext = buildJobItemContext(jobConfig, shardingItem,
jobItemProgress);
if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
@@ -80,11 +84,11 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
return;
}
prepare(jobItemContexts, governanceFacade);
- executeInventoryTasks(jobType, jobItemContexts);
- executeIncrementalTasks(jobType, jobItemContexts);
+ executeInventoryTasks(jobItemContexts, jobItemManager);
+ executeIncrementalTasks(jobItemContexts, jobItemManager);
}
- protected abstract I buildJobItemContext(T jobConfig, int shardingItem);
+ protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress);
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
@@ -112,10 +116,10 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
protected abstract void processFailed(String jobId);
- private void executeInventoryTasks(final PipelineJobType jobType, final
Collection<I> jobItemContexts) {
+ private void executeInventoryTasks(final Collection<I> jobItemContexts,
final PipelineJobItemManager<P> jobItemManager) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (I each : jobItemContexts) {
- updateJobItemStatus(each, jobType,
JobStatus.EXECUTE_INVENTORY_TASK);
+ updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK,
jobItemManager);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
continue;
@@ -129,14 +133,14 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
ExecuteEngine.trigger(futures, buildExecuteCallback("inventory",
jobItemContexts.iterator().next()));
}
- private void executeIncrementalTasks(final PipelineJobType jobType, final
Collection<I> jobItemContexts) {
+ private void executeIncrementalTasks(final Collection<I> jobItemContexts,
final PipelineJobItemManager<P> jobItemManager) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (I each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
log.info("Job status has already EXECUTE_INCREMENTAL_TASK,
ignore.");
return;
}
- updateJobItemStatus(each, jobType,
JobStatus.EXECUTE_INCREMENTAL_TASK);
+ updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK,
jobItemManager);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getIncrementalTasks()) {
if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
continue;
@@ -147,9 +151,9 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
ExecuteEngine.trigger(futures, buildExecuteCallback("incremental",
jobItemContexts.iterator().next()));
}
- private void updateJobItemStatus(final I jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
+ private void updateJobItemStatus(final I jobItemContext, final JobStatus
jobStatus, final PipelineJobItemManager<P> jobItemManager) {
jobItemContext.setStatus(jobStatus);
- new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
protected abstract ExecuteCallback buildExecuteCallback(String identifier,
I jobItemContext);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 64c6c8952b8..68ef97f5cae 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -51,7 +51,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -69,12 +68,10 @@ import java.util.stream.Collectors;
* CDC job.
*/
@Slf4j
-public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext> {
+public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext,
TransmissionJobItemProgress> {
private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
CDCJobType().getYamlJobItemProgressSwapper());
-
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@@ -88,13 +85,12 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
}
@Override
- protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
- Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
+ protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem, final TransmissionJobItemProgress
jobItemProgress) {
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"STREAMING"));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getProcessConfiguration());
- return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager(), sink);
+ return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress,
jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(),
sink);
}
private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {