This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dde2cab082c Adjust thread pools scope from job item level to job level 
(#29729)
dde2cab082c is described below

commit dde2cab082c87c5d1c58815c9117804da23ff69d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 15 19:14:48 2024 +0800

    Adjust thread pools scope from job item level to job level (#29729)
---
 .../core/job/AbstractSeparablePipelineJob.java     | 28 +++++++++++++++-------
 .../data/pipeline/core/task/IncrementalTask.java   |  6 +++--
 .../data/pipeline/core/task/InventoryTask.java     |  6 +++--
 .../consistencycheck/ConsistencyCheckJob.java      |  9 +++++++
 ...tencyCheckJobConfigurationChangedProcessor.java |  2 +-
 .../pipeline/scenario/migration/MigrationJob.java  |  4 ++++
 .../MigrationJobConfigurationChangedProcessor.java |  2 +-
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 +-
 8 files changed, 43 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 432bfe76724..56ce33a6d83 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -48,17 +47,34 @@ import java.sql.SQLException;
  * @param <I> type of pipeline job item context
  * @param <P> type of pipeline job item progress
  */
-@RequiredArgsConstructor
 @Getter
 @Slf4j
 public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
+    private final TransmissionProcessContext jobProcessContext;
+    
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
+    // TODO Remove constructor
     protected AbstractSeparablePipelineJob() {
-        this(new PipelineJobRunnerManager());
+        this("");
+    }
+    
+    protected AbstractSeparablePipelineJob(final String jobId) {
+        jobRunnerManager = new PipelineJobRunnerManager();
+        jobProcessContext = isTransmissionProcessContextNeeded() ? 
createTransmissionProcessContext(jobId) : null;
+    }
+    
+    protected boolean isTransmissionProcessContextNeeded() {
+        return true;
+    }
+    
+    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
+                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId), 
PipelineJobIdUtils.parseJobType(jobId).getType()));
+        return new TransmissionProcessContext(jobId, processConfig);
     }
     
     @Override
@@ -75,12 +91,6 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
         T jobConfig = jobConfigManager.getJobConfiguration(jobId);
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
-        TransmissionProcessContext jobProcessContext = null;
-        if (!"CONSISTENCY_CHECK".equals(jobType.getType())) {
-            PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-                    
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobType.getType()));
-            jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
-        }
         try {
             execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext));
             // CHECKSTYLE:OFF
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index bc72f79800c..ed6255bef0c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -52,8 +52,10 @@ public final class IncrementalTask implements PipelineTask {
     public Collection<CompletableFuture<?>> start() {
         
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         Collection<CompletableFuture<?>> result = new LinkedList<>();
-        result.add(incrementalExecuteEngine.submit(dumper, new 
TaskExecuteCallback(this)));
-        importers.forEach(each -> 
result.add(incrementalExecuteEngine.submit(each, new 
TaskExecuteCallback(this))));
+        synchronized (incrementalExecuteEngine) {
+            result.add(incrementalExecuteEngine.submit(dumper, new 
TaskExecuteCallback(this)));
+            importers.forEach(each -> 
result.add(incrementalExecuteEngine.submit(each, new 
TaskExecuteCallback(this))));
+        }
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index dc603f47709..19d88aea7a9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -54,8 +54,10 @@ public final class InventoryTask implements PipelineTask {
     @Override
     public Collection<CompletableFuture<?>> start() {
         Collection<CompletableFuture<?>> result = new LinkedList<>();
-        result.add(inventoryDumperExecuteEngine.submit(dumper, new 
TaskExecuteCallback(this)));
-        result.add(inventoryImporterExecuteEngine.submit(importer, new 
TaskExecuteCallback(this)));
+        synchronized (inventoryDumperExecuteEngine) {
+            result.add(inventoryDumperExecuteEngine.submit(dumper, new 
TaskExecuteCallback(this)));
+            result.add(inventoryImporterExecuteEngine.submit(importer, new 
TaskExecuteCallback(this)));
+        }
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index fca9649325c..c6fa64b08a2 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -31,6 +31,15 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.Co
  */
 public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, 
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
     
+    public ConsistencyCheckJob(final String jobId) {
+        super(jobId);
+    }
+    
+    @Override
+    protected boolean isTransmissionProcessContextNeeded() {
+        return false;
+    }
+    
     @Override
     public ConsistencyCheckJobItemContext buildJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig,
                                                               final int 
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 4afb66b446a..1a3d470f8b1 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -29,7 +29,7 @@ public final class 
ConsistencyCheckJobConfigurationChangedProcessor implements J
     
     @Override
     public PipelineJob createJob(final ConsistencyCheckJobConfiguration 
jobConfig) {
-        return new ConsistencyCheckJob();
+        return new ConsistencyCheckJob(jobConfig.getJobId());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 46bd463341e..38a5ee7affe 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -55,6 +55,10 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     
     private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
     
+    public MigrationJob(final String jobId) {
+        super(jobId);
+    }
+    
     @Override
     protected MigrationJobItemContext buildJobItemContext(final 
MigrationJobConfiguration jobConfig,
                                                           final int 
shardingItem, final TransmissionJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index 39545ca0355..af1cb8fb894 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -32,7 +32,7 @@ public final class MigrationJobConfigurationChangedProcessor 
implements JobConfi
     
     @Override
     public PipelineJob createJob(final MigrationJobConfiguration jobConfig) {
-        return new MigrationJob();
+        return new MigrationJob(jobConfig.getJobId());
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 16468324e59..19de73a4f19 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -59,7 +59,7 @@ class ConsistencyCheckJobTest {
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
-        ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
+        ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob("j02");
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 0);

Reply via email to