This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cc1e4319c3b Refactor MigrationJobPreparer (#32769)
cc1e4319c3b is described below

commit cc1e4319c3b416deab1ebe9ded26086262fd2117
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 18:49:28 2024 +0800

    Refactor MigrationJobPreparer (#32769)
    
    * Add final on TransmissionTasksRunner
    
    * Refactor MigrationJobPreparer
---
 .../core/task/runner/TransmissionTasksRunner.java  |  2 +-
 .../migration/preparer/MigrationJobPreparer.java   | 30 +++++++++++++---------
 2 files changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 0d84ed1f7df..0d1a2647c5e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -46,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
  */
 @RequiredArgsConstructor
 @Slf4j
-public class TransmissionTasksRunner implements PipelineTasksRunner {
+public final class TransmissionTasksRunner implements PipelineTasksRunner {
     
     @Getter
     private final TransmissionJobItemContext jobItemContext;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 7679a0ab4ea..2414c46dbc2 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSour
 import 
org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
@@ -51,6 +52,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.Pr
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryTaskSplitter;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobOffsetGovernanceRepository;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -69,6 +71,7 @@ import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.sql.SQLException;
@@ -110,35 +113,38 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
     private void prepareAndCheckTargetWithLock(final MigrationJobItemContext 
jobItemContext) throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String jobId = jobConfig.getJobId();
-        LockContext lockContext = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getComputeNodeInstanceContext().getLockContext();
+        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
+        ContextManager contextManager = 
PipelineContextManager.getContext(contextKey).getContextManager();
+        LockContext lockContext = 
contextManager.getComputeNodeInstanceContext().getLockContext();
         if (!jobItemManager.getProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
             jobItemManager.persistProgress(jobItemContext);
         }
         LockDefinition lockDefinition = new 
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), 
jobConfig.getJobId()));
         long startTimeMillis = System.currentTimeMillis();
-        if (lockContext.tryLock(lockDefinition, 600000L)) {
-            log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
+        if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
+            log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
-                JobOffsetInfo offsetInfo = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId);
+                PipelineJobOffsetGovernanceRepository offsetRepository = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
+                JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
-                    prepareAndCheckTarget(jobItemContext);
-                    
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId,
 new JobOffsetInfo(true));
+                    prepareAndCheckTarget(jobItemContext, contextManager);
+                    offsetRepository.persist(jobId, new JobOffsetInfo(true));
                 }
             } finally {
-                log.info("Unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
+                log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
                 lockContext.unlock(lockDefinition);
             }
         } else {
-            log.warn("Try lock failed, jobId={}, shardingItem={}", jobId, 
jobItemContext.getShardingItem());
+            log.warn("Lock failed, jobId={}, shardingItem={}.", jobId, 
jobItemContext.getShardingItem());
         }
     }
     
-    private void prepareAndCheckTarget(final MigrationJobItemContext 
jobItemContext) throws SQLException {
+    private void prepareAndCheckTarget(final MigrationJobItemContext 
jobItemContext, final ContextManager contextManager) throws SQLException {
         DatabaseType targetDatabaseType = 
jobItemContext.getJobConfig().getTargetDatabaseType();
         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
-            prepareTarget(jobItemContext, targetDatabaseType);
+            prepareTarget(jobItemContext, targetDatabaseType, contextManager);
         }
         if (null == jobItemContext.getInitProgress()) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
@@ -146,13 +152,13 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
         }
     }
     
-    private void prepareTarget(final MigrationJobItemContext jobItemContext, 
final DatabaseType targetDatabaseType) throws SQLException {
+    private void prepareTarget(final MigrationJobItemContext jobItemContext, 
final DatabaseType targetDatabaseType, final ContextManager contextManager) 
throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         Collection<CreateTableConfiguration> createTableConfigs = 
jobItemContext.getTaskConfig().getCreateTableConfigurations();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         PipelineJobDataSourcePreparer preparer = new 
PipelineJobDataSourcePreparer(targetDatabaseType);
         preparer.prepareTargetSchemas(new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager));
-        ShardingSphereMetaData metaData = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
+        ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
                 
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
         preparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine));

Reply via email to