This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ee48bc12566 Improve job progress persisting (#30441)
ee48bc12566 is described below

commit ee48bc125664eeb825895f6978ef34f042ba2484
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Mar 9 20:10:51 2024 +0800

    Improve job progress persisting (#30441)
    
    * Improve unhandled event count measuring in job progress persist service
    
    * Try to persist job progress immediately when job item execute successfully
---
 .../core/job/AbstractSeparablePipelineJob.java      |  4 ++++
 .../persist/PipelineJobProgressPersistContext.java  |  6 ++----
 .../persist/PipelineJobProgressPersistService.java  | 21 +++++++++------------
 3 files changed, 15 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 855bd8ed717..03fc042ca74 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
@@ -87,6 +88,9 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
         boolean started = false;
         try {
             started = execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext));
+            if (started) {
+                PipelineJobProgressPersistService.persistNow(jobId, 
shardingItem);
+            }
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
index d91b04a9102..beafba57ba3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
@@ -21,7 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Pipeline job progress persist context.
@@ -34,9 +34,7 @@ public final class PipelineJobProgressPersistContext {
     
     private final int shardingItem;
     
-    private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);
-    
-    private final AtomicReference<Long> beforePersistingProgressMillis = new 
AtomicReference<>(null);
+    private final AtomicLong unhandledEventCount = new AtomicLong(0);
     
     private final AtomicBoolean firstExceptionLogged = new 
AtomicBoolean(false);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index f40b1a6710a..8c47d540749 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -21,10 +21,11 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
@@ -85,7 +86,7 @@ public final class PipelineJobProgressPersistService {
     }
     
     private static void notifyPersist(final PipelineJobProgressPersistContext 
persistContext) {
-        persistContext.getHasNewEvents().set(true);
+        persistContext.getUnhandledEventCount().incrementAndGet();
     }
     
     private static Optional<PipelineJobProgressPersistContext> 
getPersistContext(final String jobId, final int shardingItem) {
@@ -101,8 +102,8 @@ public final class PipelineJobProgressPersistService {
      */
     public static void persistNow(final String jobId, final int shardingItem) {
         getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
-            if (null == 
persistContext.getBeforePersistingProgressMillis().get()) {
-                log.warn("Force persisting progress is not permitted since 
there is no previous persisting, jobId={}, shardingItem={}", jobId, 
shardingItem);
+            if (persistContext.getUnhandledEventCount().get() <= 0) {
+                log.info("Force persisting progress is not permitted since 
there is no unhandled event, jobId={}, shardingItem={}", jobId, shardingItem);
                 return;
             }
             notifyPersist(persistContext);
@@ -135,23 +136,19 @@ public final class PipelineJobProgressPersistService {
         }
         
         private static void persist0(final String jobId, final int 
shardingItem, final PipelineJobProgressPersistContext persistContext) {
-            Long beforePersistingProgressMillis = 
persistContext.getBeforePersistingProgressMillis().get();
-            if ((null == beforePersistingProgressMillis || 
System.currentTimeMillis() - beforePersistingProgressMillis < 
TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
-                    && !persistContext.getHasNewEvents().get()) {
+            long currentUnhandledEventCount = 
persistContext.getUnhandledEventCount().get();
+            ShardingSpherePreconditions.checkState(currentUnhandledEventCount 
>= 0, () -> new IllegalStateException("Current unhandled event count must be 
greater than or equal to 0"));
+            if (0 == currentUnhandledEventCount) {
                 return;
             }
             Optional<PipelineJobItemContext> jobItemContext = 
PipelineJobRegistry.getItemContext(jobId, shardingItem);
             if (!jobItemContext.isPresent()) {
                 return;
             }
-            if (null == beforePersistingProgressMillis) {
-                
persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
-            }
-            persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
             new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
                     
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
-            persistContext.getBeforePersistingProgressMillis().set(null);
+            
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
             }

Reply via email to