This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cc1e4319c3b Refactor MigrationJobPreparer (#32769)
cc1e4319c3b is described below
commit cc1e4319c3b416deab1ebe9ded26086262fd2117
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 18:49:28 2024 +0800
Refactor MigrationJobPreparer (#32769)
* Add final on TransmissionTasksRunner
* Refactor MigrationJobPreparer
---
.../core/task/runner/TransmissionTasksRunner.java | 2 +-
.../migration/preparer/MigrationJobPreparer.java | 30 +++++++++++++---------
2 files changed, 19 insertions(+), 13 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 0d84ed1f7df..0d1a2647c5e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -46,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
*/
@RequiredArgsConstructor
@Slf4j
-public class TransmissionTasksRunner implements PipelineTasksRunner {
+public final class TransmissionTasksRunner implements PipelineTasksRunner {
@Getter
private final TransmissionJobItemContext jobItemContext;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 7679a0ab4ea..2414c46dbc2 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSour
import
org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
@@ -51,6 +52,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.Pr
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryTaskSplitter;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobOffsetGovernanceRepository;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -69,6 +71,7 @@ import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
@@ -110,35 +113,38 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
private void prepareAndCheckTargetWithLock(final MigrationJobItemContext
jobItemContext) throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String jobId = jobConfig.getJobId();
- LockContext lockContext =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getComputeNodeInstanceContext().getLockContext();
+ PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
+ ContextManager contextManager =
PipelineContextManager.getContext(contextKey).getContextManager();
+ LockContext lockContext =
contextManager.getComputeNodeInstanceContext().getLockContext();
if (!jobItemManager.getProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
jobItemManager.persistProgress(jobItemContext);
}
LockDefinition lockDefinition = new
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(),
jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
- if (lockContext.tryLock(lockDefinition, 600000L)) {
- log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
+ if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
+ log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
- JobOffsetInfo offsetInfo =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId);
+ PipelineJobOffsetGovernanceRepository offsetRepository =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
+ JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
- prepareAndCheckTarget(jobItemContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId,
new JobOffsetInfo(true));
+ prepareAndCheckTarget(jobItemContext, contextManager);
+ offsetRepository.persist(jobId, new JobOffsetInfo(true));
}
} finally {
- log.info("Unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
+ log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
lockContext.unlock(lockDefinition);
}
} else {
- log.warn("Try lock failed, jobId={}, shardingItem={}", jobId,
jobItemContext.getShardingItem());
+ log.warn("Lock failed, jobId={}, shardingItem={}.", jobId,
jobItemContext.getShardingItem());
}
}
- private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext) throws SQLException {
+ private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext, final ContextManager contextManager) throws SQLException {
DatabaseType targetDatabaseType =
jobItemContext.getJobConfig().getTargetDatabaseType();
if (jobItemContext.isSourceTargetDatabaseTheSame()) {
- prepareTarget(jobItemContext, targetDatabaseType);
+ prepareTarget(jobItemContext, targetDatabaseType, contextManager);
}
if (null == jobItemContext.getInitProgress()) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
@@ -146,13 +152,13 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
}
}
- private void prepareTarget(final MigrationJobItemContext jobItemContext,
final DatabaseType targetDatabaseType) throws SQLException {
+ private void prepareTarget(final MigrationJobItemContext jobItemContext,
final DatabaseType targetDatabaseType, final ContextManager contextManager)
throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
Collection<CreateTableConfiguration> createTableConfigs =
jobItemContext.getTaskConfig().getCreateTableConfigurations();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new
PipelineJobDataSourcePreparer(targetDatabaseType);
preparer.prepareTargetSchemas(new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager));
- ShardingSphereMetaData metaData =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
+ ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
preparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine));