This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 260635d8fc4 Remove AbstractPipelineJob.jobType and jobId (#29344)
260635d8fc4 is described below

commit 260635d8fc488eaa85609de03f9a03a24da6ac2d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 23:17:11 2023 +0800

    Remove AbstractPipelineJob.jobType and jobId (#29344)
---
 .../core/job/AbstractInseparablePipelineJob.java     |  5 -----
 .../data/pipeline/core/job/AbstractPipelineJob.java  | 20 +++-----------------
 .../core/job/AbstractSeparablePipelineJob.java       |  4 ----
 .../shardingsphere/data/pipeline/cdc/CDCJob.java     |  3 +--
 .../data/pipeline/cdc/api/CDCJobAPI.java             |  2 +-
 .../consistencycheck/ConsistencyCheckJob.java        |  4 ----
 ...istencyCheckJobConfigurationChangedProcessor.java |  2 +-
 .../pipeline/scenario/migration/MigrationJob.java    |  3 +--
 .../MigrationJobConfigurationChangedProcessor.java   |  2 +-
 .../consistencycheck/ConsistencyCheckJobTest.java    |  2 +-
 10 files changed, 9 insertions(+), 38 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 1566479ee33..31c4622c1d5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -43,10 +43,6 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
     
-    protected AbstractInseparablePipelineJob(final String jobId) {
-        super(jobId);
-    }
-    
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
@@ -130,7 +126,6 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
     }
     
     private void executeIncrementalTasks(final PipelineJobType jobType, final 
Collection<T> jobItemContexts) {
-        log.info("Execute incremental tasks, jobId={}", getJobId());
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (T each : jobItemContexts) {
             if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 75b09d70ad4..cc07cae08ae 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -17,12 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
-import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -30,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarr
 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.ArrayList;
@@ -49,23 +45,12 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
     
-    @Getter
-    private final String jobId;
-    
-    @Getter(AccessLevel.PROTECTED)
-    private final PipelineJobType jobType;
-    
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     
     private final AtomicReference<JobBootstrap> jobBootstrap = new 
AtomicReference<>();
     
     private final Map<Integer, PipelineTasksRunner> tasksRunners = new 
ConcurrentHashMap<>();
     
-    protected AbstractPipelineJob(final String jobId) {
-        this.jobId = jobId;
-        jobType = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobId).getType());
-    }
-    
     /**
      * Is stopping.
      *
@@ -107,16 +92,17 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     @Override
     public final void stop() {
+        Optional<String> jobId = 
tasksRunners.values().stream().findFirst().map(each -> 
each.getJobItemContext().getJobId());
         try {
             stopping.set(true);
             log.info("Stop tasks runner, jobId={}", jobId);
             tasksRunners.values().forEach(PipelineTasksRunner::stop);
-            awaitJobStopped(jobId);
+            jobId.ifPresent(this::awaitJobStopped);
             if (null != jobBootstrap.get()) {
                 jobBootstrap.get().shutdown();
             }
         } finally {
-            PipelineJobProgressPersistService.remove(jobId);
+            jobId.ifPresent(PipelineJobProgressPersistService::remove);
             tasksRunners.values().stream().map(each -> 
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
             clean();
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 2c83e033062..46044d11a27 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -37,10 +37,6 @@ import java.sql.SQLException;
 @Slf4j
 public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
     
-    protected AbstractSeparablePipelineJob(final String jobId) {
-        super(jobId);
-    }
-    
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 133ac166992..2e782170459 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -87,8 +87,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
     
     private final CDCJobPreparer jobPreparer;
     
-    public CDCJob(final String jobId, final PipelineSink sink) {
-        super(jobId);
+    public CDCJob(final PipelineSink sink) {
         this.sink = sink;
         jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
         jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index eec99b71fbc..4d1409798c1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -212,7 +212,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      * @param sink sink
      */
     public void start(final String jobId, final PipelineSink sink) {
-        CDCJob job = new CDCJob(jobId, sink);
+        CDCJob job = new CDCJob(sink);
         PipelineJobRegistry.add(jobId, job);
         enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index df48a9815cc..5eedb8770e4 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -35,10 +35,6 @@ import java.util.Optional;
  */
 public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
     
-    public ConsistencyCheckJob(final String jobId) {
-        super(jobId);
-    }
-    
     @Override
     public ConsistencyCheckJobItemContext buildJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index b41748418a6..954e63fe640 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -37,7 +37,7 @@ public final class 
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
     
     @Override
     protected AbstractPipelineJob buildPipelineJob(final String jobId) {
-        return new ConsistencyCheckJob(jobId);
+        return new ConsistencyCheckJob();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 9ed45a30c28..56a4463de5a 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -74,8 +74,7 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     // Shared by all sharding items
     private final MigrationJobPreparer jobPreparer;
     
-    public MigrationJob(final String jobId) {
-        super(jobId);
+    public MigrationJob() {
         jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
         processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
         dataSourceManager = new DefaultPipelineDataSourceManager();
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index e8c2c0b0240..eb7266b676e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -40,7 +40,7 @@ public final class MigrationJobConfigurationChangedProcessor 
extends AbstractJob
     
     @Override
     protected AbstractPipelineJob buildPipelineJob(final String jobId) {
-        return new MigrationJob(jobId);
+        return new MigrationJob();
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 3b91f6ffc0e..a1aa5dd694b 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -54,7 +54,7 @@ class ConsistencyCheckJobTest {
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
-        ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
+        ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
         ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildJobItemContext(
                 new ShardingContext(checkJobId, "", 1, 
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, 
""));
         assertThat(actual.getProgressContext().getSourceTableCheckPositions(), 
is(expectTableCheckPosition));

Reply via email to