This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a606bda3897 Improve pipeline job progress persistence (#19775)
a606bda3897 is described below

commit a606bda38974f157dc56302eabd43222776b79a8
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Aug 2 13:46:01 2022 +0800

    Improve pipeline job progress persistence (#19775)
---
 .../api/job/persist/PipelineJobPersistContext.java |  5 ++-
 .../rulealtered/RuleAlteredJobPersistService.java  | 38 +++++++++++++---------
 2 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
index 2f0b185e097..ea590ea94f0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 @Getter
 @RequiredArgsConstructor
@@ -30,5 +31,7 @@ public final class PipelineJobPersistContext {
     
     private final int shardingItem;
     
-    private final AtomicBoolean alreadyPersisted = new AtomicBoolean(false);
+    private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);
+    
+    private final AtomicReference<Long> beforePersistingProgressMillis = new 
AtomicReference<>(null);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
index 3d5245bfef1..ee52e0d4864 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
@@ -29,8 +29,8 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Rule altered job persist service.
@@ -45,8 +45,10 @@ public final class RuleAlteredJobPersistService {
     
     private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = 
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
     
+    private static final long DELAY_SECONDS = 1;
+    
     static {
-        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new 
PersistJobContextRunnable(), 5, 1, TimeUnit.SECONDS);
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new 
PersistJobContextRunnable(), 5, DELAY_SECONDS, TimeUnit.SECONDS);
     }
     
     /**
@@ -83,34 +85,40 @@ public final class RuleAlteredJobPersistService {
             log.debug("Persist interval parameter is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
             return;
         }
-        parameter.getAlreadyPersisted().compareAndSet(true, false);
+        parameter.getHasNewEvents().set(true);
     }
     
-    private static void persist(final String jobId, final int shardingItem, 
final long persistTimeMillis, final PipelineJobPersistContext param) {
+    private static void persist(final String jobId, final int shardingItem, 
final PipelineJobPersistContext persistContext) {
+        Long beforePersistingProgressMillis = 
persistContext.getBeforePersistingProgressMillis().get();
+        if ((null == beforePersistingProgressMillis || 
System.currentTimeMillis() - beforePersistingProgressMillis < 
TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
+                && !persistContext.getHasNewEvents().get()) {
+            return;
+        }
         Map<Integer, RuleAlteredJobScheduler> schedulerMap = 
RuleAlteredJobSchedulerCenter.getJobSchedulerMap(jobId);
         RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
         if (null == scheduler) {
-            log.warn("job schedule not exists, job id: {}, sharding item: {}", 
jobId, shardingItem);
+            log.warn("persist, job schedule not exists, jobId={}, 
shardingItem={}", jobId, shardingItem);
             return;
         }
-        log.info("execute persist, job id={}, sharding item={}, 
persistTimeMillis={}", jobId, shardingItem, persistTimeMillis);
+        if (null == beforePersistingProgressMillis) {
+            
persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
+        }
+        persistContext.getHasNewEvents().set(false);
+        long startTimeMillis = System.currentTimeMillis();
         REPOSITORY_API.persistJobProgress(scheduler.getJobContext());
-        param.getAlreadyPersisted().set(true);
+        persistContext.getBeforePersistingProgressMillis().set(null);
+        if (6 == ThreadLocalRandom.current().nextInt(100)) {
+            log.info("persist, jobId={}, shardingItem={}, cost time: {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
+        }
     }
     
     private static final class PersistJobContextRunnable implements Runnable {
         
         @Override
         public void run() {
-            long currentTimeMillis = System.currentTimeMillis();
             for (Entry<String, Map<Integer, PipelineJobPersistContext>> entry 
: JOB_PERSIST_MAP.entrySet()) {
-                entry.getValue().forEach((shardingItem, param) -> {
-                    AtomicBoolean alreadyPersisted = 
param.getAlreadyPersisted();
-                    if (alreadyPersisted.get()) {
-                        return;
-                    }
-                    persist(entry.getKey(), shardingItem, currentTimeMillis, 
param);
-                    alreadyPersisted.set(true);
+                entry.getValue().forEach((shardingItem, persistContext) -> {
+                    persist(entry.getKey(), shardingItem, persistContext);
                 });
             }
         }

Reply via email to