This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4442e3d2d1d Refactor AbstractInseparablePipelineJob (#32748)
4442e3d2d1d is described below
commit 4442e3d2d1d66700b689df143a56176132499961
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 20:22:53 2024 +0800
Refactor AbstractInseparablePipelineJob (#32748)
* Refactor AbstractInseparablePipelineJob
* Refactor AbstractInseparablePipelineJob
---
.../core/job/AbstractInseparablePipelineJob.java | 21 +++++++++++++++++++--
.../core/job/AbstractSeparablePipelineJob.java | 4 +---
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 14 ++++----------
.../data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
4 files changed, 25 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index bf320c580ac..b6d4c28eac5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -30,8 +31,11 @@ import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfig
import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -55,6 +59,19 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
private final PipelineJobRunnerManager jobRunnerManager;
+ private final TransmissionProcessContext jobProcessContext;
+
+ protected AbstractInseparablePipelineJob(final String jobId, final
PipelineJobRunnerManager jobRunnerManager) {
+ this.jobRunnerManager = jobRunnerManager;
+ jobProcessContext = createTransmissionProcessContext(jobId);
+ }
+
+ private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
+ new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ return new TransmissionProcessContext(jobId, processConfig);
+ }
+
@SuppressWarnings("unchecked")
@Override
public final void execute(final ShardingContext shardingContext) {
@@ -71,7 +88,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
return;
}
P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
- I jobItemContext = buildJobItemContext(jobConfig, shardingItem,
jobItemProgress);
+ I jobItemContext = buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext);
if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
@@ -88,7 +105,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
executeIncrementalTasks(jobItemContexts, jobItemManager);
}
- protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress);
+ protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress, TransmissionProcessContext jobProcessContext);
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 9ba500ebd93..5afdc66cb8b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -54,8 +54,6 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
private final TransmissionProcessContext jobProcessContext;
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
protected AbstractSeparablePipelineJob(final String jobId) {
this(jobId, true);
}
@@ -67,7 +65,7 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
return new TransmissionProcessContext(jobId, processConfig);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 68ef97f5cae..c210fffc0e0 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -49,9 +49,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunner
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
@@ -72,23 +70,19 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@Getter
private final PipelineSink sink;
- public CDCJob(final PipelineSink sink) {
- super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
+ public CDCJob(final String jobId, final PipelineSink sink) {
+ super(jobId, new PipelineJobRunnerManager(new
CDCJobRunnerCleaner(sink)));
this.sink = sink;
}
@Override
- protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem, final TransmissionJobItemProgress
jobItemProgress) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"STREAMING"));
- TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
+ protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration
jobConfig,
+ final int shardingItem,
final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getProcessConfiguration());
return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress,
jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(),
sink);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 2e9d6187cf8..805a15c1666 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -223,7 +223,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
* @param sink sink
*/
public void start(final String jobId, final PipelineSink sink) {
- CDCJob job = new CDCJob(sink);
+ CDCJob job = new CDCJob(jobId, sink);
PipelineJobRegistry.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);