This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 670d974860e Refactor TransmissionTasksRunner (#32766)
670d974860e is described below
commit 670d974860e56ef1897cd187f64f464825c22296
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 12:46:23 2024 +0800
Refactor TransmissionTasksRunner (#32766)
---
.../core/task/runner/TransmissionTasksRunner.java | 84 +++++++++-------------
1 file changed, 35 insertions(+), 49 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index d869bd2a2ef..0d84ed1f7df 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -31,9 +31,9 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProg
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
@@ -57,8 +57,6 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
private final PipelineJobType jobType;
- private final PipelineJobManager jobManager;
-
private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
public TransmissionTasksRunner(final TransmissionJobItemContext
jobItemContext) {
@@ -66,40 +64,23 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
jobType = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
- jobManager = new PipelineJobManager(jobType);
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
}
- @Override
- public void stop() {
- jobItemContext.setStopping(true);
- for (PipelineTask each : inventoryTasks) {
- each.stop();
- QuietlyCloser.close(each);
- }
- for (PipelineTask each : incrementalTasks) {
- each.stop();
- QuietlyCloser.close(each);
- }
- }
-
@Override
public void start() {
- if (jobItemContext.isStopping()) {
- throw new PipelineJobCancelingException();
- }
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
-
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
+ ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
+ jobItemManager.persistProgress(jobItemContext);
if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
- executeIncrementalTask();
+ executeIncrementalTasks();
} else {
- executeInventoryTask();
+ executeInventoryTasks();
}
}
- private synchronized void executeInventoryTask() {
- updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
+ private synchronized void executeInventoryTasks() {
+ updateJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (PipelineTask each : inventoryTasks) {
if (each.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
@@ -110,24 +91,17 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
ExecuteEngine.trigger(futures, new InventoryTaskExecuteCallback());
}
- private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
- jobItemContext.setStatus(jobStatus);
- jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
- }
-
- private synchronized void executeIncrementalTask() {
- if (jobItemContext.isStopping()) {
- throw new PipelineJobCancelingException();
- }
+ private synchronized void executeIncrementalTasks() {
+ ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
if (incrementalTasks.isEmpty()) {
- log.info("incrementalTasks empty, ignore");
+ log.info("Incremental tasks are empty, ignore.");
return;
}
if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemContext.getStatus()) {
- log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
+ log.info("Incremental tasks had already run, ignore.");
return;
}
- updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
+ updateJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (PipelineTask each : incrementalTasks) {
if (each.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
@@ -138,13 +112,21 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
ExecuteEngine.trigger(futures, new IncrementalExecuteCallback());
}
- protected void inventorySuccessCallback() {
- if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
- log.info("onSuccess, all inventory tasks finished.");
-
PipelineJobProgressPersistService.persistNow(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- executeIncrementalTask();
- } else {
- log.info("onSuccess, inventory tasks not finished");
+ private void updateJobItemStatus(final JobStatus jobStatus) {
+ jobItemContext.setStatus(jobStatus);
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ }
+
+ @Override
+ public void stop() {
+ jobItemContext.setStopping(true);
+ for (PipelineTask each : inventoryTasks) {
+ each.stop();
+ QuietlyCloser.close(each);
+ }
+ for (PipelineTask each : incrementalTasks) {
+ each.stop();
+ QuietlyCloser.close(each);
}
}
@@ -152,10 +134,14 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
@Override
public void onSuccess() {
- if (jobItemContext.isStopping()) {
- throw new PipelineJobCancelingException();
+
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
+ if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
+ log.info("onSuccess, all inventory tasks finished.");
+
PipelineJobProgressPersistService.persistNow(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ executeIncrementalTasks();
+ } else {
+ log.info("onSuccess, inventory tasks did not finish.");
}
- inventorySuccessCallback();
}
@Override
@@ -163,7 +149,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
}
}
- private final class IncrementalExecuteCallback implements ExecuteCallback {
+ private static final class IncrementalExecuteCallback implements
ExecuteCallback {
@Override
public void onSuccess() {