This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e989837997a Use PipelineJobRunnerManager instead of 
AbstractPipelineJob (#29345)
e989837997a is described below

commit e989837997a582e3b8e8d3845d1e8e932a52669f
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 10 13:44:41 2023 +0800

    Use PipelineJobRunnerManager instead of AbstractPipelineJob (#29345)
---
 .../core/job/AbstractInseparablePipelineJob.java   | 13 ++++--
 .../core/job/AbstractSeparablePipelineJob.java     | 13 ++++--
 .../data/pipeline/core/job/PipelineJob.java        | 26 +++--------
 .../pipeline/core/job/PipelineJobRegistry.java     |  6 +--
 .../PipelineJobRunnerCleaner.java}                 | 31 +++----------
 .../PipelineJobRunnerManager.java}                 | 52 ++++++++++++++++------
 .../AbstractJobConfigurationChangedProcessor.java  | 14 +++---
 .../pipeline/core/job/PipelineJobRegistryTest.java |  9 ++--
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 13 ++----
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  2 +-
 .../pipeline/cdc/engine/CDCJobRunnerCleaner.java}  | 41 +++++++----------
 .../consistencycheck/ConsistencyCheckJob.java      |  9 ++--
 ...tencyCheckJobConfigurationChangedProcessor.java |  4 +-
 .../pipeline/scenario/migration/MigrationJob.java  |  8 ++--
 .../engine/MigrationJobRunnerCleaner.java}         | 37 +++++----------
 .../MigrationJobConfigurationChangedProcessor.java |  6 +--
 16 files changed, 129 insertions(+), 155 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 31c4622c1d5..09af3e3d7a5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -17,12 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -40,8 +43,12 @@ import java.util.concurrent.CompletableFuture;
  * 
  * @param <T> type of pipeline job item context
  */
+@RequiredArgsConstructor
+@Getter
 @Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobItemContext> implements PipelineJob {
+    
+    private final PipelineJobRunnerManager jobRunnerManager;
     
     @Override
     public final void execute(final ShardingContext shardingContext) {
@@ -51,12 +58,12 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
         PipelineJobConfiguration jobConfig = 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Collection<T> jobItemContexts = new LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
-            if (isStopping()) {
+            if (jobRunnerManager.isStopping()) {
                 log.info("Stopping true, ignore");
                 return;
             }
             T jobItemContext = buildJobItemContext(jobConfig, shardingItem);
-            if (!addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
+            if (!jobRunnerManager.addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 46044d11a27..ff1d04515da 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -17,11 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -34,15 +37,19 @@ import java.sql.SQLException;
  * 
  * @param <T> type of pipeline job item context
  */
+@RequiredArgsConstructor
+@Getter
 @Slf4j
-public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
+public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobItemContext> implements PipelineJob {
+    
+    private final PipelineJobRunnerManager jobRunnerManager;
     
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
-        if (isStopping()) {
+        if (jobRunnerManager.isStopping()) {
             log.info("Stopping true, ignore");
             return;
         }
@@ -60,7 +67,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobItemCont
         String jobId = jobItemContext.getJobId();
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
-        if (!addTasksRunner(shardingItem, tasksRunner)) {
+        if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
index e71792d6f9a..a8368a566bf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
@@ -17,34 +17,18 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
-import java.util.Collection;
-import java.util.Optional;
-
 /**
  * Pipeline job.
  */
 public interface PipelineJob extends SimpleJob {
     
     /**
-     * Get tasks runner.
-     *
-     * @param shardingItem sharding item
-     * @return tasks runner
-     */
-    Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
-    
-    /**
-     * Get sharding items.
-     *
-     * @return sharding items
-     */
-    Collection<Integer> getShardingItems();
-    
-    /**
-     * Stop job.
+     * Get pipeline job runner manager.
+     * 
+     * @return pipeline job runner manager
      */
-    void stop();
+    PipelineJobRunnerManager getJobRunnerManager();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
index 7d812cc1c21..f1660e2714e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
@@ -76,7 +76,7 @@ public final class PipelineJobRegistry {
         if (null == job) {
             return;
         }
-        job.stop();
+        job.getJobRunnerManager().stop();
         JOBS.remove(jobId);
     }
     
@@ -88,7 +88,7 @@ public final class PipelineJobRegistry {
      * @return pipeline job item context
      */
     public static Optional<PipelineJobItemContext> getItemContext(final String 
jobId, final int shardingItem) {
-        return JOBS.containsKey(jobId) ? 
JOBS.get(jobId).getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext)
 : Optional.empty();
+        return JOBS.containsKey(jobId) ? 
JOBS.get(jobId).getJobRunnerManager().getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext)
 : Optional.empty();
     }
     
     /**
@@ -98,6 +98,6 @@ public final class PipelineJobRegistry {
      * @return sharding items
      */
     public static Collection<Integer> getShardingItems(final String jobId) {
-        return JOBS.containsKey(jobId) ? JOBS.get(jobId).getShardingItems() : 
Collections.emptyList();
+        return JOBS.containsKey(jobId) ? 
JOBS.get(jobId).getJobRunnerManager().getShardingItems() : 
Collections.emptyList();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
similarity index 54%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
index e71792d6f9a..a0a572c2629 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
@@ -15,36 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-
-import java.util.Collection;
-import java.util.Optional;
+package org.apache.shardingsphere.data.pipeline.core.job.engine;
 
 /**
- * Pipeline job.
+ * Pipeline job runner cleaner.
  */
-public interface PipelineJob extends SimpleJob {
-    
-    /**
-     * Get tasks runner.
-     *
-     * @param shardingItem sharding item
-     * @return tasks runner
-     */
-    Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
-    
-    /**
-     * Get sharding items.
-     *
-     * @return sharding items
-     */
-    Collection<Integer> getShardingItems();
+public interface PipelineJobRunnerCleaner {
     
     /**
-     * Stop job.
+     * Clean pipeline job.
      */
-    void stop();
+    void clean();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
similarity index 82%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index cc07cae08ae..d3bd6879c90 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.core.job.engine;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -38,10 +39,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Abstract pipeline job.
+ * Pipeline job runner manager.
  */
+@RequiredArgsConstructor
 @Slf4j
-public abstract class AbstractPipelineJob implements PipelineJob {
+public final class PipelineJobRunnerManager {
     
     private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
     
@@ -51,6 +53,12 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private final Map<Integer, PipelineTasksRunner> tasksRunners = new 
ConcurrentHashMap<>();
     
+    private final PipelineJobRunnerCleaner cleaner;
+    
+    public PipelineJobRunnerManager() {
+        this(null);
+    }
+    
     /**
      * Is stopping.
      *
@@ -69,17 +77,33 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         this.jobBootstrap.set(jobBootstrap);
     }
     
-    @Override
-    public final Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
+    /**
+     * Get tasks runner.
+     *
+     * @param shardingItem sharding item
+     * @return tasks runner
+     */
+    public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
         return Optional.ofNullable(tasksRunners.get(shardingItem));
     }
     
-    @Override
-    public final Collection<Integer> getShardingItems() {
+    /**
+     * Get sharding items.
+     *
+     * @return sharding items
+     */
+    public Collection<Integer> getShardingItems() {
         return new ArrayList<>(tasksRunners.keySet());
     }
     
-    protected final boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
+    /**
+     * Add tasks runner.
+     * 
+     * @param shardingItem sharding item
+     * @param tasksRunner tasks runner
+     * @return add success or not
+     */
+    public boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
         if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
             log.warn("shardingItem {} tasks runner exists, ignore", 
shardingItem);
             return false;
@@ -90,8 +114,10 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         return true;
     }
     
-    @Override
-    public final void stop() {
+    /**
+     * Stop job.
+     */
+    public void stop() {
         Optional<String> jobId = 
tasksRunners.values().stream().findFirst().map(each -> 
each.getJobItemContext().getJobId());
         try {
             stopping.set(true);
@@ -104,7 +130,9 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         } finally {
             jobId.ifPresent(PipelineJobProgressPersistService::remove);
             tasksRunners.values().stream().map(each -> 
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
-            clean();
+            if (null != cleaner) {
+                cleaner.clean();
+            }
         }
     }
     
@@ -128,6 +156,4 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
             spentMills += sleepMillis;
         }
     }
-    
-    protected abstract void clean();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index f128bb4d6b3..38c7b9cfad1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -18,14 +18,14 @@
 package 
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
@@ -80,14 +80,14 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
     
     protected void executeJob(final JobConfiguration jobConfig) {
         String jobId = jobConfig.getJobName();
-        AbstractPipelineJob job = buildPipelineJob(jobId);
+        PipelineJob job = buildPipelineJob(jobId);
         PipelineJobRegistry.add(jobId, job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfig);
-        job.setJobBootstrap(oneOffJobBootstrap);
+        job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
-    protected abstract AbstractPipelineJob buildPipelineJob(String jobId);
+    protected abstract PipelineJob buildPipelineJob(String jobId);
     
     protected abstract PipelineJobType getJobType();
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
index adbca271a20..0ca79713d4e 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 class PipelineJobRegistryTest {
     
-    @Mock
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private PipelineJob job;
     
     @BeforeEach
@@ -75,7 +76,7 @@ class PipelineJobRegistryTest {
     @Test
     void assertStop() {
         PipelineJobRegistry.stop("foo_job");
-        verify(job).stop();
+        verify(job.getJobRunnerManager()).stop();
         assertFalse(PipelineJobRegistry.isExisting("foo_job"));
     }
     
@@ -84,7 +85,7 @@ class PipelineJobRegistryTest {
         PipelineJobItemContext jobItemContext = 
mock(PipelineJobItemContext.class);
         PipelineTasksRunner tasksRunner = mock(PipelineTasksRunner.class);
         when(tasksRunner.getJobItemContext()).thenReturn(jobItemContext);
-        
when(job.getTasksRunner(anyInt())).thenReturn(Optional.of(tasksRunner));
+        
when(job.getJobRunnerManager().getTasksRunner(anyInt())).thenReturn(Optional.of(tasksRunner));
         Optional<PipelineJobItemContext> actual = 
PipelineJobRegistry.getItemContext("foo_job", 1);
         assertTrue(actual.isPresent());
         assertThat(actual.get(), is(jobItemContext));
@@ -97,7 +98,7 @@ class PipelineJobRegistryTest {
     
     @Test
     void assertGetExistedShardingItems() {
-        when(job.getShardingItems()).thenReturn(Arrays.asList(1, 2, 3));
+        
when(job.getJobRunnerManager().getShardingItems()).thenReturn(Arrays.asList(1, 
2, 3));
         assertThat(PipelineJobRegistry.getShardingItems("foo_job"), 
is(Arrays.asList(1, 2, 3)));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 2e782170459..fdaed744821 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -29,13 +29,13 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
+import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
@@ -48,6 +48,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -59,7 +60,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAl
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.Collection;
 import java.util.Map;
@@ -83,11 +83,12 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService;
     
-    private final PipelineDataSourceManager dataSourceManager;
+    private final DefaultPipelineDataSourceManager dataSourceManager;
     
     private final CDCJobPreparer jobPreparer;
     
     public CDCJob(final PipelineSink sink) {
+        super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
         this.sink = sink;
         jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
         jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
@@ -157,12 +158,6 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
         ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
     }
     
-    @Override
-    protected void clean() {
-        dataSourceManager.close();
-        QuietlyCloser.close(sink);
-    }
-    
     @RequiredArgsConstructor
     private final class CDCExecuteCallback implements ExecuteCallback {
         
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 4d1409798c1..0398be69743 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -217,7 +217,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
-        job.setJobBootstrap(oneOffJobBootstrap);
+        job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
similarity index 50%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
copy to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
index e71792d6f9a..e02c3343438 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
@@ -15,36 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.cdc.engine;
 
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-
-import java.util.Collection;
-import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerCleaner;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 /**
- * Pipeline job.
+ * CDC job runner cleaner.
  */
-public interface PipelineJob extends SimpleJob {
+@RequiredArgsConstructor
+public final class CDCJobRunnerCleaner implements PipelineJobRunnerCleaner {
     
-    /**
-     * Get tasks runner.
-     *
-     * @param shardingItem sharding item
-     * @return tasks runner
-     */
-    Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
+    private final DefaultPipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
-    /**
-     * Get sharding items.
-     *
-     * @return sharding items
-     */
-    Collection<Integer> getShardingItems();
+    private final PipelineSink sink;
     
-    /**
-     * Stop job.
-     */
-    void stop();
+    @Override
+    public void clean() {
+        dataSourceManager.close();
+        QuietlyCloser.close(sink);
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5eedb8770e4..7cf1a25af59 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -35,6 +36,10 @@ import java.util.Optional;
  */
 public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
     
+    public ConsistencyCheckJob() {
+        super(new PipelineJobRunnerManager());
+    }
+    
     @Override
     public ConsistencyCheckJobItemContext buildJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
@@ -51,8 +56,4 @@ public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<Cons
     @Override
     protected void doPrepare(final ConsistencyCheckJobItemContext 
jobItemContext) {
     }
-    
-    @Override
-    protected void clean() {
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 954e63fe640..e57d0066a76 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -18,9 +18,9 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
@@ -36,7 +36,7 @@ public final class 
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
     }
     
     @Override
-    protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+    protected PipelineJob buildPipelineJob(final String jobId) {
         return new ConsistencyCheckJob();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 56a4463de5a..0e6d74270fb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfigurati
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -45,6 +46,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.engine.MigrationJobRunnerCleaner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -75,6 +77,7 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     private final MigrationJobPreparer jobPreparer;
     
     public MigrationJob() {
+        super(new PipelineJobRunnerManager(new MigrationJobRunnerCleaner()));
         jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
         processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
         dataSourceManager = new DefaultPipelineDataSourceManager();
@@ -135,9 +138,4 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     protected void doPrepare(final MigrationJobItemContext jobItemContext) 
throws SQLException {
         jobPreparer.prepare(jobItemContext);
     }
-    
-    @Override
-    public void clean() {
-        dataSourceManager.close();
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
similarity index 54%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
copy to 
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
index e71792d6f9a..48400463c6f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
@@ -15,36 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.engine;
 
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-
-import java.util.Collection;
-import java.util.Optional;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerCleaner;
 
 /**
- * Pipeline job.
+ * Migration job runner cleaner.
  */
-public interface PipelineJob extends SimpleJob {
-    
-    /**
-     * Get tasks runner.
-     *
-     * @param shardingItem sharding item
-     * @return tasks runner
-     */
-    Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
+public final class MigrationJobRunnerCleaner implements 
PipelineJobRunnerCleaner {
     
-    /**
-     * Get sharding items.
-     *
-     * @return sharding items
-     */
-    Collection<Integer> getShardingItems();
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
-    /**
-     * Stop job.
-     */
-    void stop();
+    @Override
+    public void clean() {
+        dataSourceManager.close();
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index eb7266b676e..f9e57077ac5 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -18,13 +18,13 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 
 /**
@@ -39,7 +39,7 @@ public final class MigrationJobConfigurationChangedProcessor 
extends AbstractJob
     }
     
     @Override
-    protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+    protected PipelineJob buildPipelineJob(final String jobId) {
         return new MigrationJob();
     }
     


Reply via email to