This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d801647117e Add PipelineJobExecutor (#32762)
d801647117e is described below

commit d801647117ecc468f97d7e1b33ef7e8f1ba996b1
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 20:20:15 2024 +0800

    Add PipelineJobExecutor (#32762)
    
    * Add PipelineJobExecutor
    
    * Add PipelineJobExecutor
    
    * Add PipelineJobExecutor
    
    * Add PipelineJobExecutor
---
 .../DistributedPipelineJobExecutor.java}           | 53 +++++++-------
 .../DistributedPipelineJobExecutorCallback.java    | 65 +++++++++++++++++
 .../consistencycheck/ConsistencyCheckJob.java      | 29 ++++----
 ...va => ConsistencyCheckJobExecutorCallback.java} | 18 +++--
 .../pipeline/scenario/migration/MigrationJob.java  | 85 +++-------------------
 ...nJob.java => MigrationJobExecutorCallback.java} | 22 +++---
 ...> ConsistencyCheckJobExecutorCallbackTest.java} |  8 +-
 7 files changed, 141 insertions(+), 139 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
similarity index 74%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
index 7a021fdf93b..a18639f6f83 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.core.job.executor;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
@@ -41,25 +42,25 @@ import 
org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import java.sql.SQLException;
 
 /**
- * Abstract separable pipeline job.
- * 
- * @param <T> type of pipeline job configuration
- * @param <I> type of pipeline job item context
- * @param <P> type of pipeline job item progress
+ * Distributed pipeline job executor.
  */
-@Getter
+@RequiredArgsConstructor
 @Slf4j
-public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
+public final class DistributedPipelineJobExecutor {
     
-    private final PipelineJobRunnerManager jobRunnerManager;
+    @SuppressWarnings("rawtypes")
+    private final DistributedPipelineJobExecutorCallback callback;
     
-    protected AbstractSeparablePipelineJob() {
-        jobRunnerManager = new PipelineJobRunnerManager();
-    }
+    @Getter
+    private final PipelineJobRunnerManager jobRunnerManager = new 
PipelineJobRunnerManager();
     
+    /**
+     * Execute job.
+     *
+     * @param shardingContext sharding context
+     */
     @SuppressWarnings("unchecked")
-    @Override
-    public final void execute(final ShardingContext shardingContext) {
+    public void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}.", jobId, shardingItem);
@@ -69,14 +70,14 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         }
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-        P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
+        PipelineJobConfiguration jobConfig = 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        PipelineJobItemProgress jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
         TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
         boolean started = false;
         try {
-            started = execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext), governanceFacade);
+            started = execute(callback.buildJobItemContext(jobConfig, 
shardingItem, jobItemProgress, jobProcessContext, 
jobRunnerManager.getDataSourceManager()), governanceFacade);
             if (started) {
                 PipelineJobProgressPersistService.persistNow(jobId, 
shardingItem);
             }
@@ -95,9 +96,10 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
         }
     }
     
-    private boolean execute(final I jobItemContext, final 
PipelineGovernanceFacade governanceFacade) {
+    @SuppressWarnings("unchecked")
+    private boolean execute(final PipelineJobItemContext jobItemContext, final 
PipelineGovernanceFacade governanceFacade) {
         int shardingItem = jobItemContext.getShardingItem();
-        PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
+        PipelineTasksRunner tasksRunner = 
callback.buildTasksRunner(jobItemContext);
         if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
             return false;
         }
@@ -117,19 +119,14 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         return new TransmissionProcessContext(jobId, processConfig);
     }
     
-    protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress, TransmissionProcessContext jobProcessContext);
-    
-    protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
-    
-    protected final void prepare(final I jobItemContext) {
+    @SuppressWarnings("unchecked")
+    private void prepare(final PipelineJobItemContext jobItemContext) {
         try {
-            doPrepare(jobItemContext);
+            callback.prepare(jobItemContext);
             // CHECKSTYLE:OFF
         } catch (final SQLException ex) {
             // CHECKSTYLE:ON
             throw new PipelineInternalException(ex);
         }
     }
-    
-    protected abstract void doPrepare(I jobItemContext) throws SQLException;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutorCallback.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutorCallback.java
new file mode 100644
index 00000000000..54ca60c0368
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutorCallback.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.executor;
+
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+
+import java.sql.SQLException;
+
+/**
+ * Distributed pipeline job executor callback.
+ * 
+ * @param <T> type of pipeline job configuration
+ * @param <I> type of pipeline job item context
+ * @param <P> type of pipeline job item progress
+ */
+public interface DistributedPipelineJobExecutorCallback<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> {
+    
+    /**
+     * Build job item context.
+     *
+     * @param jobConfig job configuration
+     * @param shardingItem sharding item
+     * @param jobItemProgress job item progress
+     * @param jobProcessContext job process context
+     * @param dataSourceManager pipeline data source manager
+     * @return built job item context
+     */
+    I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress, 
TransmissionProcessContext jobProcessContext, PipelineDataSourceManager 
dataSourceManager);
+    
+    /**
+     * Build tasks runner.
+     *
+     * @param jobItemContext job item context
+     * @return built tasks runner
+     */
+    PipelineTasksRunner buildTasksRunner(I jobItemContext);
+    
+    /**
+     * Prepare.
+     *
+     * @param jobItemContext job item context
+     * @throws SQLException SQL exception
+     */
+    void prepare(I jobItemContext) throws SQLException;
+}
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index fca9649325c..28a2cf5683a 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -17,32 +17,29 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutor;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
 /**
  * Consistency check job.
  */
-public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, 
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
+public final class ConsistencyCheckJob implements PipelineJob {
     
-    @Override
-    public ConsistencyCheckJobItemContext buildJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig,
-                                                              final int 
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
-        return new ConsistencyCheckJobItemContext(jobConfig, shardingItem, 
JobStatus.RUNNING, jobItemProgress);
+    private final DistributedPipelineJobExecutor executor;
+    
+    public ConsistencyCheckJob() {
+        executor = new DistributedPipelineJobExecutor(new 
ConsistencyCheckJobExecutorCallback());
     }
     
     @Override
-    protected PipelineTasksRunner buildTasksRunner(final 
ConsistencyCheckJobItemContext jobItemContext) {
-        return new ConsistencyCheckTasksRunner(jobItemContext);
+    public PipelineJobRunnerManager getJobRunnerManager() {
+        return executor.getJobRunnerManager();
     }
     
     @Override
-    protected void doPrepare(final ConsistencyCheckJobItemContext 
jobItemContext) {
+    public void execute(final ShardingContext shardingContext) {
+        executor.execute(shardingContext);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallback.java
similarity index 67%
copy from 
kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
copy to 
kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallback.java
index fca9649325c..456cf20e326 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallback.java
@@ -18,8 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -27,22 +28,25 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
 
 /**
- * Consistency check job.
+ * Consistency check job executor callback.
  */
-public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, 
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
+public final class ConsistencyCheckJobExecutorCallback
+        implements
+            
DistributedPipelineJobExecutorCallback<ConsistencyCheckJobConfiguration, 
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
     
     @Override
-    public ConsistencyCheckJobItemContext buildJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig,
-                                                              final int 
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
+    public ConsistencyCheckJobItemContext buildJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem,
+                                                              final 
ConsistencyCheckJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext,
+                                                              final 
PipelineDataSourceManager dataSourceManager) {
         return new ConsistencyCheckJobItemContext(jobConfig, shardingItem, 
JobStatus.RUNNING, jobItemProgress);
     }
     
     @Override
-    protected PipelineTasksRunner buildTasksRunner(final 
ConsistencyCheckJobItemContext jobItemContext) {
+    public PipelineTasksRunner buildTasksRunner(final 
ConsistencyCheckJobItemContext jobItemContext) {
         return new ConsistencyCheckTasksRunner(jobItemContext);
     }
     
     @Override
-    protected void doPrepare(final ConsistencyCheckJobItemContext 
jobItemContext) {
+    public void prepare(final ConsistencyCheckJobItemContext jobItemContext) {
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index aea9d599519..fe970619b40 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -17,90 +17,29 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutor;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
 /**
  * Migration job.
  */
-public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJobConfiguration, 
MigrationJobItemContext, TransmissionJobItemProgress> {
-    
-    private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
-    
-    @Override
-    protected MigrationJobItemContext buildJobItemContext(final 
MigrationJobConfiguration jobConfig,
-                                                          final int 
shardingItem, final TransmissionJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
-        MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getProcessConfiguration());
-        return new MigrationJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager());
-    }
+public final class MigrationJob implements PipelineJob {
     
-    private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
-        Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
-                ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        return new 
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
 createTableConfigs, incrementalDumperContext, importerConfig);
-    }
-    
-    private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper mapper) {
-        return 
jobConfig.getTablesFirstDataNodes().getEntries().stream().map(each -> 
getCreateTableConfiguration(jobConfig, mapper, 
each)).collect(Collectors.toList());
-    }
-    
-    private CreateTableConfiguration getCreateTableConfiguration(final 
MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper, 
final JobDataNodeEntry jobDataNodeEntry) {
-        DataNode dataNode = jobDataNodeEntry.getDataNodes().get(0);
-        PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
-        String sourceSchemaName = 
mapper.getSchemaName(jobDataNodeEntry.getLogicTableName());
-        String targetSchemaName = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData().isSchemaAvailable()
 ? sourceSchemaName : null;
-        return new CreateTableConfiguration(sourceDataSourceConfig, new 
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
-                jobConfig.getTarget(), new 
CaseInsensitiveQualifiedTable(targetSchemaName, 
jobDataNodeEntry.getLogicTableName()));
-    }
+    private final DistributedPipelineJobExecutor executor;
     
-    private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper mapper) {
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
-        int retryTimes = jobConfig.getRetryTimes();
-        int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, mapper, batchSize, writeRateLimitAlgorithm, retryTimes, 
concurrency);
+    public MigrationJob() {
+        executor = new DistributedPipelineJobExecutor(new 
MigrationJobExecutorCallback());
     }
     
     @Override
-    protected PipelineTasksRunner buildTasksRunner(final 
MigrationJobItemContext jobItemContext) {
-        return new TransmissionTasksRunner(jobItemContext);
+    public PipelineJobRunnerManager getJobRunnerManager() {
+        return executor.getJobRunnerManager();
     }
     
     @Override
-    protected void doPrepare(final MigrationJobItemContext jobItemContext) 
throws SQLException {
-        jobPreparer.prepare(jobItemContext);
+    public void execute(final ShardingContext shardingContext) {
+        executor.execute(shardingContext);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
similarity index 86%
copy from 
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
copy to 
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index aea9d599519..0d0b136c2fd 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -21,10 +21,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
@@ -49,17 +50,16 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * Migration job.
+ * Migration job executor callback.
  */
-public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJobConfiguration, 
MigrationJobItemContext, TransmissionJobItemProgress> {
-    
-    private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
+public final class MigrationJobExecutorCallback implements 
DistributedPipelineJobExecutorCallback<MigrationJobConfiguration, 
MigrationJobItemContext, TransmissionJobItemProgress> {
     
     @Override
-    protected MigrationJobItemContext buildJobItemContext(final 
MigrationJobConfiguration jobConfig,
-                                                          final int 
shardingItem, final TransmissionJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
+    public MigrationJobItemContext buildJobItemContext(final 
MigrationJobConfiguration jobConfig, final int shardingItem,
+                                                       final 
TransmissionJobItemProgress jobItemProgress, final TransmissionProcessContext 
jobProcessContext,
+                                                       final 
PipelineDataSourceManager dataSourceManager) {
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getProcessConfiguration());
-        return new MigrationJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager());
+        return new MigrationJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext, taskConfig, dataSourceManager);
     }
     
     private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
@@ -95,12 +95,12 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     }
     
     @Override
-    protected PipelineTasksRunner buildTasksRunner(final 
MigrationJobItemContext jobItemContext) {
+    public PipelineTasksRunner buildTasksRunner(final MigrationJobItemContext 
jobItemContext) {
         return new TransmissionTasksRunner(jobItemContext);
     }
     
     @Override
-    protected void doPrepare(final MigrationJobItemContext jobItemContext) 
throws SQLException {
-        jobPreparer.prepare(jobItemContext);
+    public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
+        new MigrationJobPreparer().prepare(jobItemContext);
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
similarity index 94%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 16468324e59..17066812b80 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobExecutorCallback;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -45,7 +45,7 @@ import java.util.Optional;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
-class ConsistencyCheckJobTest {
+class ConsistencyCheckJobExecutorCallbackTest {
     
     @BeforeAll
     static void beforeClass() {
@@ -59,11 +59,11 @@ class ConsistencyCheckJobTest {
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
-        ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
+        ConsistencyCheckJobExecutorCallback callback = new 
ConsistencyCheckJobExecutorCallback();
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 0);
-        ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildJobItemContext(jobConfig, 0, 
jobItemProgress.orElse(null), null);
+        ConsistencyCheckJobItemContext actual = 
callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null, 
null);
         assertThat(actual.getProgressContext().getSourceTableCheckPositions(), 
is(expectTableCheckPosition));
         assertThat(actual.getProgressContext().getTargetTableCheckPositions(), 
is(expectTableCheckPosition));
     }


Reply via email to