This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ee48bc12566 Improve job progress persisting (#30441)
ee48bc12566 is described below
commit ee48bc125664eeb825895f6978ef34f042ba2484
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Mar 9 20:10:51 2024 +0800
Improve job progress persisting (#30441)
* Improve unhandled event count measuring in job progress persist service
* Try to persist job progress immediately when job item execute successfully
---
.../core/job/AbstractSeparablePipelineJob.java | 4 ++++
.../persist/PipelineJobProgressPersistContext.java | 6 ++----
.../persist/PipelineJobProgressPersistService.java | 21 +++++++++------------
3 files changed, 15 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 855bd8ed717..03fc042ca74 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
@@ -87,6 +88,9 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
boolean started = false;
try {
started = execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext));
+ if (started) {
+ PipelineJobProgressPersistService.persistNow(jobId,
shardingItem);
+ }
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
index d91b04a9102..beafba57ba3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Pipeline job progress persist context.
@@ -34,9 +34,7 @@ public final class PipelineJobProgressPersistContext {
private final int shardingItem;
- private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);
-
- private final AtomicReference<Long> beforePersistingProgressMillis = new
AtomicReference<>(null);
+ private final AtomicLong unhandledEventCount = new AtomicLong(0);
private final AtomicBoolean firstExceptionLogged = new
AtomicBoolean(false);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index f40b1a6710a..8c47d540749 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -21,10 +21,11 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -85,7 +86,7 @@ public final class PipelineJobProgressPersistService {
}
private static void notifyPersist(final PipelineJobProgressPersistContext
persistContext) {
- persistContext.getHasNewEvents().set(true);
+ persistContext.getUnhandledEventCount().incrementAndGet();
}
private static Optional<PipelineJobProgressPersistContext>
getPersistContext(final String jobId, final int shardingItem) {
@@ -101,8 +102,8 @@ public final class PipelineJobProgressPersistService {
*/
public static void persistNow(final String jobId, final int shardingItem) {
getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
- if (null ==
persistContext.getBeforePersistingProgressMillis().get()) {
- log.warn("Force persisting progress is not permitted since
there is no previous persisting, jobId={}, shardingItem={}", jobId,
shardingItem);
+ if (persistContext.getUnhandledEventCount().get() <= 0) {
+ log.info("Force persisting progress is not permitted since
there is no unhandled event, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
notifyPersist(persistContext);
@@ -135,23 +136,19 @@ public final class PipelineJobProgressPersistService {
}
private static void persist0(final String jobId, final int
shardingItem, final PipelineJobProgressPersistContext persistContext) {
- Long beforePersistingProgressMillis =
persistContext.getBeforePersistingProgressMillis().get();
- if ((null == beforePersistingProgressMillis ||
System.currentTimeMillis() - beforePersistingProgressMillis <
TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
- && !persistContext.getHasNewEvents().get()) {
+ long currentUnhandledEventCount =
persistContext.getUnhandledEventCount().get();
+ ShardingSpherePreconditions.checkState(currentUnhandledEventCount
>= 0, () -> new IllegalStateException("Current unhandled event count must be
greater than or equal to 0"));
+ if (0 == currentUnhandledEventCount) {
return;
}
Optional<PipelineJobItemContext> jobItemContext =
PipelineJobRegistry.getItemContext(jobId, shardingItem);
if (!jobItemContext.isPresent()) {
return;
}
- if (null == beforePersistingProgressMillis) {
-
persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
- }
- persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
- persistContext.getBeforePersistingProgressMillis().set(null);
+
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
}