This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d801647117e Add PipelineJobExecutor (#32762)
d801647117e is described below
commit d801647117ecc468f97d7e1b33ef7e8f1ba996b1
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 20:20:15 2024 +0800
Add PipelineJobExecutor (#32762)
* Add PipelineJobExecutor
* Add PipelineJobExecutor
* Add PipelineJobExecutor
* Add PipelineJobExecutor
---
.../DistributedPipelineJobExecutor.java} | 53 +++++++-------
.../DistributedPipelineJobExecutorCallback.java | 65 +++++++++++++++++
.../consistencycheck/ConsistencyCheckJob.java | 29 ++++----
...va => ConsistencyCheckJobExecutorCallback.java} | 18 +++--
.../pipeline/scenario/migration/MigrationJob.java | 85 +++-------------------
...nJob.java => MigrationJobExecutorCallback.java} | 22 +++---
...> ConsistencyCheckJobExecutorCallbackTest.java} | 8 +-
7 files changed, 141 insertions(+), 139 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
similarity index 74%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
index 7a021fdf93b..a18639f6f83 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.core.job.executor;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
@@ -41,25 +42,25 @@ import
org.apache.shardingsphere.elasticjob.api.ShardingContext;
import java.sql.SQLException;
/**
- * Abstract separable pipeline job.
- *
- * @param <T> type of pipeline job configuration
- * @param <I> type of pipeline job item context
- * @param <P> type of pipeline job item progress
+ * Distributed pipeline job executor.
*/
-@Getter
+@RequiredArgsConstructor
@Slf4j
-public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
+public final class DistributedPipelineJobExecutor {
- private final PipelineJobRunnerManager jobRunnerManager;
+ @SuppressWarnings("rawtypes")
+ private final DistributedPipelineJobExecutorCallback callback;
- protected AbstractSeparablePipelineJob() {
- jobRunnerManager = new PipelineJobRunnerManager();
- }
+ @Getter
+ private final PipelineJobRunnerManager jobRunnerManager = new
PipelineJobRunnerManager();
+ /**
+ * Execute job.
+ *
+ * @param shardingContext sharding context
+ */
@SuppressWarnings("unchecked")
- @Override
- public final void execute(final ShardingContext shardingContext) {
+ public void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}.", jobId, shardingItem);
@@ -69,14 +70,14 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
- T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
- P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
+ PipelineJobConfiguration jobConfig =
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ PipelineJobItemProgress jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
TransmissionProcessContext jobProcessContext =
createTransmissionProcessContext(jobId, jobType, contextKey);
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
boolean started = false;
try {
- started = execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext), governanceFacade);
+ started = execute(callback.buildJobItemContext(jobConfig,
shardingItem, jobItemProgress, jobProcessContext,
jobRunnerManager.getDataSourceManager()), governanceFacade);
if (started) {
PipelineJobProgressPersistService.persistNow(jobId,
shardingItem);
}
@@ -95,9 +96,10 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
}
}
- private boolean execute(final I jobItemContext, final
PipelineGovernanceFacade governanceFacade) {
+ @SuppressWarnings("unchecked")
+ private boolean execute(final PipelineJobItemContext jobItemContext, final
PipelineGovernanceFacade governanceFacade) {
int shardingItem = jobItemContext.getShardingItem();
- PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
+ PipelineTasksRunner tasksRunner =
callback.buildTasksRunner(jobItemContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
return false;
}
@@ -117,19 +119,14 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
return new TransmissionProcessContext(jobId, processConfig);
}
- protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress, TransmissionProcessContext jobProcessContext);
-
- protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
-
- protected final void prepare(final I jobItemContext) {
+ @SuppressWarnings("unchecked")
+ private void prepare(final PipelineJobItemContext jobItemContext) {
try {
- doPrepare(jobItemContext);
+ callback.prepare(jobItemContext);
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
throw new PipelineInternalException(ex);
}
}
-
- protected abstract void doPrepare(I jobItemContext) throws SQLException;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutorCallback.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutorCallback.java
new file mode 100644
index 00000000000..54ca60c0368
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutorCallback.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.executor;
+
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+
+import java.sql.SQLException;
+
+/**
+ * Distributed pipeline job executor callback.
+ *
+ * @param <T> type of pipeline job configuration
+ * @param <I> type of pipeline job item context
+ * @param <P> type of pipeline job item progress
+ */
+public interface DistributedPipelineJobExecutorCallback<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> {
+
+ /**
+ * Build job item context.
+ *
+ * @param jobConfig job configuration
+ * @param shardingItem sharding item
+ * @param jobItemProgress job item progress
+ * @param jobProcessContext job process context
+ * @param dataSourceManager pipeline data source manager
+ * @return built job item context
+ */
+ I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress,
TransmissionProcessContext jobProcessContext, PipelineDataSourceManager
dataSourceManager);
+
+ /**
+ * Build tasks runner.
+ *
+ * @param jobItemContext job item context
+ * @return built tasks runner
+ */
+ PipelineTasksRunner buildTasksRunner(I jobItemContext);
+
+ /**
+ * Prepare.
+ *
+ * @param jobItemContext job item context
+ * @throws SQLException SQL exception
+ */
+ void prepare(I jobItemContext) throws SQLException;
+}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index fca9649325c..28a2cf5683a 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -17,32 +17,29 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutor;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
/**
* Consistency check job.
*/
-public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration,
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
+public final class ConsistencyCheckJob implements PipelineJob {
- @Override
- public ConsistencyCheckJobItemContext buildJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig,
- final int
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
- return new ConsistencyCheckJobItemContext(jobConfig, shardingItem,
JobStatus.RUNNING, jobItemProgress);
+ private final DistributedPipelineJobExecutor executor;
+
+ public ConsistencyCheckJob() {
+ executor = new DistributedPipelineJobExecutor(new
ConsistencyCheckJobExecutorCallback());
}
@Override
- protected PipelineTasksRunner buildTasksRunner(final
ConsistencyCheckJobItemContext jobItemContext) {
- return new ConsistencyCheckTasksRunner(jobItemContext);
+ public PipelineJobRunnerManager getJobRunnerManager() {
+ return executor.getJobRunnerManager();
}
@Override
- protected void doPrepare(final ConsistencyCheckJobItemContext
jobItemContext) {
+ public void execute(final ShardingContext shardingContext) {
+ executor.execute(shardingContext);
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallback.java
similarity index 67%
copy from
kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
copy to
kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallback.java
index fca9649325c..456cf20e326 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallback.java
@@ -18,8 +18,9 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -27,22 +28,25 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
/**
- * Consistency check job.
+ * Consistency check job executor callback.
*/
-public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration,
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
+public final class ConsistencyCheckJobExecutorCallback
+ implements
+
DistributedPipelineJobExecutorCallback<ConsistencyCheckJobConfiguration,
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
@Override
- public ConsistencyCheckJobItemContext buildJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig,
- final int
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
+ public ConsistencyCheckJobItemContext buildJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem,
+ final
ConsistencyCheckJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext,
+ final
PipelineDataSourceManager dataSourceManager) {
return new ConsistencyCheckJobItemContext(jobConfig, shardingItem,
JobStatus.RUNNING, jobItemProgress);
}
@Override
- protected PipelineTasksRunner buildTasksRunner(final
ConsistencyCheckJobItemContext jobItemContext) {
+ public PipelineTasksRunner buildTasksRunner(final
ConsistencyCheckJobItemContext jobItemContext) {
return new ConsistencyCheckTasksRunner(jobItemContext);
}
@Override
- protected void doPrepare(final ConsistencyCheckJobItemContext
jobItemContext) {
+ public void prepare(final ConsistencyCheckJobItemContext jobItemContext) {
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index aea9d599519..fe970619b40 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -17,90 +17,29 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutor;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
/**
* Migration job.
*/
-public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJobConfiguration,
MigrationJobItemContext, TransmissionJobItemProgress> {
-
- private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
-
- @Override
- protected MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig,
- final int
shardingItem, final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
- MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getProcessConfiguration());
- return new MigrationJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager());
- }
+public final class MigrationJob implements PipelineJob {
- private MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
- ((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- return new
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
- }
-
- private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper mapper) {
- return
jobConfig.getTablesFirstDataNodes().getEntries().stream().map(each ->
getCreateTableConfiguration(jobConfig, mapper,
each)).collect(Collectors.toList());
- }
-
- private CreateTableConfiguration getCreateTableConfiguration(final
MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper,
final JobDataNodeEntry jobDataNodeEntry) {
- DataNode dataNode = jobDataNodeEntry.getDataNodes().get(0);
- PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
- String sourceSchemaName =
mapper.getSchemaName(jobDataNodeEntry.getLogicTableName());
- String targetSchemaName = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData().isSchemaAvailable()
? sourceSchemaName : null;
- return new CreateTableConfiguration(sourceDataSourceConfig, new
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
- jobConfig.getTarget(), new
CaseInsensitiveQualifiedTable(targetSchemaName,
jobDataNodeEntry.getLogicTableName()));
- }
+ private final DistributedPipelineJobExecutor executor;
- private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper mapper) {
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
- int retryTimes = jobConfig.getRetryTimes();
- int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, mapper, batchSize, writeRateLimitAlgorithm, retryTimes,
concurrency);
+ public MigrationJob() {
+ executor = new DistributedPipelineJobExecutor(new
MigrationJobExecutorCallback());
}
@Override
- protected PipelineTasksRunner buildTasksRunner(final
MigrationJobItemContext jobItemContext) {
- return new TransmissionTasksRunner(jobItemContext);
+ public PipelineJobRunnerManager getJobRunnerManager() {
+ return executor.getJobRunnerManager();
}
@Override
- protected void doPrepare(final MigrationJobItemContext jobItemContext)
throws SQLException {
- jobPreparer.prepare(jobItemContext);
+ public void execute(final ShardingContext shardingContext) {
+ executor.execute(shardingContext);
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
similarity index 86%
copy from
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
copy to
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index aea9d599519..0d0b136c2fd 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -21,10 +21,11 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
@@ -49,17 +50,16 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * Migration job.
+ * Migration job executor callback.
*/
-public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJobConfiguration,
MigrationJobItemContext, TransmissionJobItemProgress> {
-
- private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
+public final class MigrationJobExecutorCallback implements
DistributedPipelineJobExecutorCallback<MigrationJobConfiguration,
MigrationJobItemContext, TransmissionJobItemProgress> {
@Override
- protected MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig,
- final int
shardingItem, final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
+ public MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig, final int shardingItem,
+ final
TransmissionJobItemProgress jobItemProgress, final TransmissionProcessContext
jobProcessContext,
+ final
PipelineDataSourceManager dataSourceManager) {
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getProcessConfiguration());
- return new MigrationJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager());
+ return new MigrationJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext, taskConfig, dataSourceManager);
}
private MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
@@ -95,12 +95,12 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
}
@Override
- protected PipelineTasksRunner buildTasksRunner(final
MigrationJobItemContext jobItemContext) {
+ public PipelineTasksRunner buildTasksRunner(final MigrationJobItemContext
jobItemContext) {
return new TransmissionTasksRunner(jobItemContext);
}
@Override
- protected void doPrepare(final MigrationJobItemContext jobItemContext)
throws SQLException {
- jobPreparer.prepare(jobItemContext);
+ public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
+ new MigrationJobPreparer().prepare(jobItemContext);
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
similarity index 94%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 16468324e59..17066812b80 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobExecutorCallback;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -45,7 +45,7 @@ import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-class ConsistencyCheckJobTest {
+class ConsistencyCheckJobExecutorCallbackTest {
@BeforeAll
static void beforeClass() {
@@ -59,11 +59,11 @@ class ConsistencyCheckJobTest {
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
- ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
+ ConsistencyCheckJobExecutorCallback callback = new
ConsistencyCheckJobExecutorCallback();
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(), 0);
- ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildJobItemContext(jobConfig, 0,
jobItemProgress.orElse(null), null);
+ ConsistencyCheckJobItemContext actual =
callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null,
null);
assertThat(actual.getProgressContext().getSourceTableCheckPositions(),
is(expectTableCheckPosition));
assertThat(actual.getProgressContext().getTargetTableCheckPositions(),
is(expectTableCheckPosition));
}