This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 972772743fb Refactor AbstractInseparablePipelineJob (#32743)
972772743fb is described below
commit 972772743fb49463ecdaf47aa3b264dcd8740a17
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 14:59:09 2024 +0800
Refactor AbstractInseparablePipelineJob (#32743)
---
.../core/job/AbstractInseparablePipelineJob.java | 26 +++++++++++-----------
.../core/job/service/PipelineJobItemManager.java | 7 ++----
2 files changed, 15 insertions(+), 18 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index bfeb843e569..48aec69d15e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -29,9 +29,9 @@ import
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -65,20 +65,21 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Collection<I> jobItemContexts = new LinkedList<>();
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
I jobItemContext = buildJobItemContext(jobConfig, shardingItem);
if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
jobItemContexts.add(jobItemContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
- log.info("Start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
+ governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
+ log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId,
shardingItem);
}
if (jobItemContexts.isEmpty()) {
- log.warn("Job item contexts is empty, ignore");
+ log.warn("Job item contexts are empty, ignore.");
return;
}
- prepare(jobItemContexts);
+ prepare(jobItemContexts, governanceFacade);
executeInventoryTasks(jobType, jobItemContexts);
executeIncrementalTasks(jobType, jobItemContexts);
}
@@ -87,14 +88,14 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
- private void prepare(final Collection<I> jobItemContexts) {
+ private void prepare(final Collection<I> jobItemContexts, final
PipelineGovernanceFacade governanceFacade) {
try {
doPrepare(jobItemContexts);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
for (PipelineJobItemContext each : jobItemContexts) {
- processFailed(each.getJobId(), each.getShardingItem(), ex);
+ processFailed(each.getJobId(), each.getShardingItem(), ex,
governanceFacade);
}
throw ex;
}
@@ -102,9 +103,9 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
protected abstract void doPrepare(Collection<I> jobItemContexts);
- private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
- log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
+ private void processFailed(final String jobId, final int shardingItem,
final Exception ex, final PipelineGovernanceFacade governanceFacade) {
+ log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
+ governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
PipelineJobRegistry.stop(jobId);
processFailed(jobId);
}
@@ -132,7 +133,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (I each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
- log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
+ log.info("Job status has already EXECUTE_INCREMENTAL_TASK,
ignore.");
return;
}
updateJobItemStatus(each, jobType,
JobStatus.EXECUTE_INCREMENTAL_TASK);
@@ -148,8 +149,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
private void updateJobItemStatus(final I jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
- jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
protected abstract ExecuteCallback buildExecuteCallback(String identifier,
I jobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 696afff8333..0269b5ee5cb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
@@ -33,15 +34,11 @@ import java.util.Optional;
*
* @param <T> type of pipeline job item progress
*/
+@RequiredArgsConstructor
public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {
private final
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
swapper;
- @SuppressWarnings({"rawtypes", "unchecked"})
- public PipelineJobItemManager(final YamlPipelineJobItemProgressSwapper
swapper) {
- this.swapper = swapper;
- }
-
/**
* Update job item status.
*