This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d2365c1a72c Refactor AbstractPipelineJob.jobId as final (#25647)
d2365c1a72c is described below

commit d2365c1a72c646f804136d4d1c18f7bb542dffcd
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat May 13 23:38:45 2023 +0800

    Refactor AbstractPipelineJob.jobId as final (#25647)
---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java            |  2 +-
 .../data/pipeline/cdc/core/job/CDCJob.java               |  7 +++++--
 .../data/pipeline/core/job/AbstractPipelineJob.java      | 16 ++++++----------
 .../pipeline/core/job/AbstractSimplePipelineJob.java     |  5 ++++-
 .../impl/AbstractChangedJobConfigurationProcessor.java   |  4 ++--
 .../scenario/consistencycheck/ConsistencyCheckJob.java   |  4 ++++
 ...ChangedConsistencyCheckJobConfigurationProcessor.java |  4 ++--
 .../data/pipeline/scenario/migration/MigrationJob.java   |  6 ++++--
 .../ChangedMigrationJobConfigurationProcessor.java       |  4 ++--
 .../consistencycheck/ConsistencyCheckJobTest.java        |  5 +----
 10 files changed, 31 insertions(+), 26 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 6787dcfbe21..5ffd47a2619 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -226,7 +226,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
      * @param importerConnector importer connector
      */
     public void startJob(final String jobId, final ImporterConnector 
importerConnector) {
-        CDCJob job = new CDCJob(importerConnector);
+        CDCJob job = new CDCJob(jobId, importerConnector);
         PipelineJobCenter.addJob(jobId, job);
         updateJobConfigurationDisabled(jobId, false);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 62e8fe62a64..ee6ea603e61 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.job;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -43,7 +42,6 @@ import java.util.Optional;
 /**
  * CDC job.
  */
-@RequiredArgsConstructor
 @Slf4j
 public final class CDCJob extends AbstractSimplePipelineJob {
     
@@ -55,6 +53,11 @@ public final class CDCJob extends AbstractSimplePipelineJob {
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
+    public CDCJob(final String jobId, final ImporterConnector 
importerConnector) {
+        super(jobId);
+        this.importerConnector = importerConnector;
+    }
+    
     @Override
     protected void doPrepare(final PipelineJobItemContext jobItemContext) {
         jobPreparer.initTasks((CDCJobItemContext) jobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index cf6a41cb263..adb71754739 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -49,10 +49,10 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractPipelineJob implements PipelineJob {
     
     @Getter
-    private volatile String jobId;
+    private final String jobId;
     
     @Getter(AccessLevel.PROTECTED)
-    private volatile PipelineJobAPI jobAPI;
+    private final PipelineJobAPI jobAPI;
     
     @Getter
     private volatile boolean stopping;
@@ -62,7 +62,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
     
-    protected void setJobId(final String jobId) {
+    protected AbstractPipelineJob(final String jobId) {
         this.jobId = jobId;
         jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getTypeName());
     }
@@ -129,10 +129,8 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         for (PipelineTasksRunner each : tasksRunnerMap.values()) {
             each.stop();
         }
-        if (null != jobId) {
-            Optional<ElasticJobListener> pipelineJobListener = 
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, 
PipelineElasticJobListener.class.getName());
-            pipelineJobListener.ifPresent(jobListener -> 
awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, 
TimeUnit.SECONDS.toMillis(2)));
-        }
+        Optional<ElasticJobListener> pipelineJobListener = 
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, 
PipelineElasticJobListener.class.getName());
+        pipelineJobListener.ifPresent(jobListener -> 
awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, 
TimeUnit.SECONDS.toMillis(2)));
         if (null != jobBootstrap) {
             jobBootstrap.shutdown();
         }
@@ -157,9 +155,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private void innerClean() {
         tasksRunnerMap.clear();
-        if (null != jobId) {
-            
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
-        }
+        
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
     }
     
     protected abstract void doClean();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 02477826d6d..2aa8225d639 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -29,6 +29,10 @@ import 
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 @Slf4j
 public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob 
implements SimpleJob {
     
+    protected AbstractSimplePipelineJob(final String jobId) {
+        super(jobId);
+    }
+    
     /**
      * Build pipeline job item context.
      * 
@@ -48,7 +52,6 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
             log.info("stopping true, ignore");
             return;
         }
-        setJobId(jobId);
         PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
         PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
         if (!addTasksRunner(shardingItem, tasksRunner)) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index 223a73b2aa8..d48001ab41a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -79,15 +79,15 @@ public abstract class 
AbstractChangedJobConfigurationProcessor implements Change
     protected abstract void onDeleted(JobConfiguration jobConfig);
     
     protected void executeJob(final JobConfiguration jobConfig) {
-        AbstractPipelineJob job = buildPipelineJob();
         String jobId = jobConfig.getJobName();
+        AbstractPipelineJob job = buildPipelineJob(jobId);
         PipelineJobCenter.addJob(jobId, job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfig);
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
-    protected abstract AbstractPipelineJob buildPipelineJob();
+    protected abstract AbstractPipelineJob buildPipelineJob(String jobId);
     
     protected abstract JobType getJobType();
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 3bb0cb7909e..e04b02a0c17 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -38,6 +38,10 @@ import java.util.Optional;
 @Slf4j
 public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
     
+    public ConsistencyCheckJob(final String jobId) {
+        super(jobId);
+    }
+    
     @Override
     public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
index 5d418989f7f..a6039aed8ef 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
@@ -36,8 +36,8 @@ public final class 
ChangedConsistencyCheckJobConfigurationProcessor extends Abst
     }
     
     @Override
-    protected AbstractPipelineJob buildPipelineJob() {
-        return new ConsistencyCheckJob();
+    protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+        return new ConsistencyCheckJob(jobId);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index be4c0a586b8..ff598d5142f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -42,7 +41,6 @@ import java.util.Optional;
 /**
  * Migration job.
  */
-@RequiredArgsConstructor
 @Slf4j
 public final class MigrationJob extends AbstractSimplePipelineJob {
     
@@ -53,6 +51,10 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     // Shared by all sharding items
     private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
     
+    public MigrationJob(final String jobId) {
+        super(jobId);
+    }
+    
     @Override
     protected InventoryIncrementalJobItemContext 
buildPipelineJobItemContext(final ShardingContext shardingContext) {
         int shardingItem = shardingContext.getShardingItem();
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
index 7937dc8b338..de16d1ad589 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
@@ -39,8 +39,8 @@ public final class ChangedMigrationJobConfigurationProcessor 
extends AbstractCha
     }
     
     @Override
-    protected AbstractPipelineJob buildPipelineJob() {
-        return new MigrationJob();
+    protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+        return new MigrationJob(jobId);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 4e8969ae1f5..0dcd41a9279 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -33,7 +32,6 @@ import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfiguratio
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.util.Collections;
 import java.util.Map;
@@ -55,8 +53,7 @@ class ConsistencyCheckJobTest {
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
-        ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
-        
Plugins.getMemberAccessor().invoke(AbstractPipelineJob.class.getDeclaredMethod("setJobId",
 String.class), consistencyCheckJob, checkJobId);
+        ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
         ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildPipelineJobItemContext(
                 new ShardingContext(checkJobId, "", 1, 
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, 
""));
         assertThat(actual.getProgressContext().getTableCheckPositions(), 
is(expectTableCheckPosition));

Reply via email to