This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c40a9c5856c Refactor AbstractInseparablePipelineJob (#32747)
c40a9c5856c is described below
commit c40a9c5856c707fe2be05b925fe557dc9e036ccf
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 20:08:03 2024 +0800
Refactor AbstractInseparablePipelineJob (#32747)
---
.../data/pipeline/core/job/AbstractInseparablePipelineJob.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index d1949182d9b..bf320c580ac 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -60,16 +60,16 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
- if (jobRunnerManager.isStopping()) {
- log.info("Job is stopping, ignore.");
- return;
- }
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Collection<I> jobItemContexts = new LinkedList<>();
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
+ if (jobRunnerManager.isStopping()) {
+ log.info("Job is stopping, ignore.");
+ return;
+ }
P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
I jobItemContext = buildJobItemContext(jobConfig, shardingItem,
jobItemProgress);
if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {