This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 972772743fb Refactor AbstractInseparablePipelineJob (#32743)
972772743fb is described below

commit 972772743fb49463ecdaf47aa3b264dcd8740a17
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 14:59:09 2024 +0800

    Refactor AbstractInseparablePipelineJob (#32743)
---
 .../core/job/AbstractInseparablePipelineJob.java   | 26 +++++++++++-----------
 .../core/job/service/PipelineJobItemManager.java   |  7 ++----
 2 files changed, 15 insertions(+), 18 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index bfeb843e569..48aec69d15e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -29,9 +29,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -65,20 +65,21 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Collection<I> jobItemContexts = new LinkedList<>();
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             I jobItemContext = buildJobItemContext(jobConfig, shardingItem);
             if (!jobRunnerManager.addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
-            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
-            log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
+            governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId, 
shardingItem);
+            log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, 
shardingItem);
         }
         if (jobItemContexts.isEmpty()) {
-            log.warn("Job item contexts is empty, ignore");
+            log.warn("Job item contexts are empty, ignore.");
             return;
         }
-        prepare(jobItemContexts);
+        prepare(jobItemContexts, governanceFacade);
         executeInventoryTasks(jobType, jobItemContexts);
         executeIncrementalTasks(jobType, jobItemContexts);
     }
@@ -87,14 +88,14 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
     
-    private void prepare(final Collection<I> jobItemContexts) {
+    private void prepare(final Collection<I> jobItemContexts, final 
PipelineGovernanceFacade governanceFacade) {
         try {
             doPrepare(jobItemContexts);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
             for (PipelineJobItemContext each : jobItemContexts) {
-                processFailed(each.getJobId(), each.getShardingItem(), ex);
+                processFailed(each.getJobId(), each.getShardingItem(), ex, 
governanceFacade);
             }
             throw ex;
         }
@@ -102,9 +103,9 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
     
     protected abstract void doPrepare(Collection<I> jobItemContexts);
     
-    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
-        log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
+    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex, final PipelineGovernanceFacade governanceFacade) {
+        log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
+        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, ex);
         PipelineJobRegistry.stop(jobId);
         processFailed(jobId);
     }
@@ -132,7 +133,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (I each : jobItemContexts) {
             if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
-                log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
+                log.info("Job status has already EXECUTE_INCREMENTAL_TASK, 
ignore.");
                 return;
             }
             updateJobItemStatus(each, jobType, 
JobStatus.EXECUTE_INCREMENTAL_TASK);
@@ -148,8 +149,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
     
     private void updateJobItemStatus(final I jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+        new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).updateStatus(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), jobStatus);
     }
     
     protected abstract ExecuteCallback buildExecuteCallback(String identifier, 
I jobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 696afff8333..0269b5ee5cb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
@@ -33,15 +34,11 @@ import java.util.Optional;
  * 
  * @param <T> type of pipeline job item progress
  */
+@RequiredArgsConstructor
 public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {
     
     private final 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
swapper;
     
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public PipelineJobItemManager(final YamlPipelineJobItemProgressSwapper 
swapper) {
-        this.swapper = swapper;
-    }
-    
     /**
      * Update job item status.
      *

Reply via email to