This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a17d519718b Refactor AbstractPipelineJob (#29337)
a17d519718b is described below
commit a17d519718ba5f95f856ebe522fa4ae23fe071c7
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 15:43:05 2023 +0800
Refactor AbstractPipelineJob (#29337)
---
.../pipeline/core/job/AbstractPipelineJob.java | 54 +++++++++-------------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 2 +-
.../pipeline/scenario/migration/MigrationJob.java | 2 +-
4 files changed, 26 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 402d9546f7a..75b09d70ad4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -38,7 +38,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -48,6 +47,8 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public abstract class AbstractPipelineJob implements PipelineJob {
+ private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
+
@Getter
private final String jobId;
@@ -107,49 +108,40 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
@Override
public final void stop() {
try {
- innerStop();
+ stopping.set(true);
+ log.info("Stop tasks runner, jobId={}", jobId);
+ tasksRunners.values().forEach(PipelineTasksRunner::stop);
+ awaitJobStopped(jobId);
+ if (null != jobBootstrap.get()) {
+ jobBootstrap.get().shutdown();
+ }
} finally {
- innerClean();
- doClean();
+ PipelineJobProgressPersistService.remove(jobId);
+ tasksRunners.values().stream().map(each ->
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
+ clean();
}
}
- private void innerStop() {
- stopping.set(true);
- log.info("stop tasks runner, jobId={}", jobId);
- for (PipelineTasksRunner each : tasksRunners.values()) {
- each.stop();
+ private void awaitJobStopped(final String jobId) {
+ Optional<ElasticJobListener> jobListener =
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class,
PipelineElasticJobListener.class.getName());
+ if (!jobListener.isPresent()) {
+ return;
}
- Optional<ElasticJobListener> pipelineJobListener =
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class,
PipelineElasticJobListener.class.getName());
- pipelineJobListener.ifPresent(optional ->
awaitJobStopped((PipelineElasticJobListener) optional, jobId,
TimeUnit.SECONDS.toMillis(2)));
- if (null != jobBootstrap.get()) {
- jobBootstrap.get().shutdown();
- }
- }
-
- private void awaitJobStopped(final PipelineElasticJobListener jobListener,
final String jobId, final long timeoutMillis) {
- int time = 0;
- int sleepTime = 50;
- while (time < timeoutMillis) {
- if (!jobListener.isJobRunning(jobId)) {
+ long spentMills = 0L;
+ long sleepMillis = 50L;
+ while (spentMills < JOB_WAITING_TIMEOUT_MILLS) {
+ if (!((PipelineElasticJobListener)
jobListener.get()).isJobRunning(jobId)) {
break;
}
try {
- Thread.sleep(sleepTime);
+ Thread.sleep(sleepMillis);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
break;
}
- time += sleepTime;
- }
- }
-
- private void innerClean() {
- PipelineJobProgressPersistService.remove(jobId);
- for (PipelineTasksRunner each : tasksRunners.values()) {
-
QuietlyCloser.close(each.getJobItemContext().getJobProcessContext());
+ spentMills += sleepMillis;
}
}
- protected abstract void doClean();
+ protected abstract void clean();
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index b957fb2c7b5..3150895b804 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -170,7 +170,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob implements Simp
}
@Override
- protected void doClean() {
+ protected void clean() {
dataSourceManager.close();
QuietlyCloser.close(sink);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index d669eda81e1..5d13dd54240 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -60,6 +60,6 @@ public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob {
}
@Override
- protected void doClean() {
+ protected void clean() {
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index b990ef1a620..5a204212d5b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -142,7 +142,7 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob {
}
@Override
- public void doClean() {
+ public void clean() {
dataSourceManager.close();
}
}