This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dde2cab082c Adjust thread pools scope from job item level to job level
(#29729)
dde2cab082c is described below
commit dde2cab082c87c5d1c58815c9117804da23ff69d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 15 19:14:48 2024 +0800
Adjust thread pools scope from job item level to job level (#29729)
---
.../core/job/AbstractSeparablePipelineJob.java | 28 +++++++++++++++-------
.../data/pipeline/core/task/IncrementalTask.java | 6 +++--
.../data/pipeline/core/task/InventoryTask.java | 6 +++--
.../consistencycheck/ConsistencyCheckJob.java | 9 +++++++
...tencyCheckJobConfigurationChangedProcessor.java | 2 +-
.../pipeline/scenario/migration/MigrationJob.java | 4 ++++
.../MigrationJobConfigurationChangedProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
8 files changed, 43 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 432bfe76724..56ce33a6d83 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -48,17 +47,34 @@ import java.sql.SQLException;
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
*/
-@RequiredArgsConstructor
@Getter
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
private final PipelineJobRunnerManager jobRunnerManager;
+ private final TransmissionProcessContext jobProcessContext;
+
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+ // TODO Remove constructor
protected AbstractSeparablePipelineJob() {
- this(new PipelineJobRunnerManager());
+ this("");
+ }
+
+ protected AbstractSeparablePipelineJob(final String jobId) {
+ jobRunnerManager = new PipelineJobRunnerManager();
+ jobProcessContext = isTransmissionProcessContextNeeded() ?
createTransmissionProcessContext(jobId) : null;
+ }
+
+ protected boolean isTransmissionProcessContextNeeded() {
+ return true;
+ }
+
+ private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
+
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ return new TransmissionProcessContext(jobId, processConfig);
}
@Override
@@ -75,12 +91,6 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
T jobConfig = jobConfigManager.getJobConfiguration(jobId);
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
- TransmissionProcessContext jobProcessContext = null;
- if (!"CONSISTENCY_CHECK".equals(jobType.getType())) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobType.getType()));
- jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
- }
try {
execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext));
// CHECKSTYLE:OFF
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index bc72f79800c..ed6255bef0c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -52,8 +52,10 @@ public final class IncrementalTask implements PipelineTask {
public Collection<CompletableFuture<?>> start() {
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
Collection<CompletableFuture<?>> result = new LinkedList<>();
- result.add(incrementalExecuteEngine.submit(dumper, new
TaskExecuteCallback(this)));
- importers.forEach(each ->
result.add(incrementalExecuteEngine.submit(each, new
TaskExecuteCallback(this))));
+ synchronized (incrementalExecuteEngine) {
+ result.add(incrementalExecuteEngine.submit(dumper, new
TaskExecuteCallback(this)));
+ importers.forEach(each ->
result.add(incrementalExecuteEngine.submit(each, new
TaskExecuteCallback(this))));
+ }
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index dc603f47709..19d88aea7a9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -54,8 +54,10 @@ public final class InventoryTask implements PipelineTask {
@Override
public Collection<CompletableFuture<?>> start() {
Collection<CompletableFuture<?>> result = new LinkedList<>();
- result.add(inventoryDumperExecuteEngine.submit(dumper, new
TaskExecuteCallback(this)));
- result.add(inventoryImporterExecuteEngine.submit(importer, new
TaskExecuteCallback(this)));
+ synchronized (inventoryDumperExecuteEngine) {
+ result.add(inventoryDumperExecuteEngine.submit(dumper, new
TaskExecuteCallback(this)));
+ result.add(inventoryImporterExecuteEngine.submit(importer, new
TaskExecuteCallback(this)));
+ }
return result;
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index fca9649325c..c6fa64b08a2 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -31,6 +31,15 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.Co
*/
public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration,
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
+ public ConsistencyCheckJob(final String jobId) {
+ super(jobId);
+ }
+
+ @Override
+ protected boolean isTransmissionProcessContextNeeded() {
+ return false;
+ }
+
@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig,
final int
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 4afb66b446a..1a3d470f8b1 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -29,7 +29,7 @@ public final class
ConsistencyCheckJobConfigurationChangedProcessor implements J
@Override
public PipelineJob createJob(final ConsistencyCheckJobConfiguration
jobConfig) {
- return new ConsistencyCheckJob();
+ return new ConsistencyCheckJob(jobConfig.getJobId());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 46bd463341e..38a5ee7affe 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -55,6 +55,10 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
+ public MigrationJob(final String jobId) {
+ super(jobId);
+ }
+
@Override
protected MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig,
final int
shardingItem, final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index 39545ca0355..af1cb8fb894 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -32,7 +32,7 @@ public final class MigrationJobConfigurationChangedProcessor
implements JobConfi
@Override
public PipelineJob createJob(final MigrationJobConfiguration jobConfig) {
- return new MigrationJob();
+ return new MigrationJob(jobConfig.getJobId());
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 16468324e59..19de73a4f19 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -59,7 +59,7 @@ class ConsistencyCheckJobTest {
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
- ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
+ ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob("j02");
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(), 0);