This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 09cb7af7fec Refactor pipeline job prepare stage and related code 
(#24425)
09cb7af7fec is described below

commit 09cb7af7fec5d867e63c9ce65ccef5681d878419
Author: Xinze Guo <101622833+aze...@users.noreply.github.com>
AuthorDate: Thu Mar 2 19:58:52 2023 +0800

    Refactor pipeline job prepare stage and related code (#24425)
    
    * Refactor pipeline job prepare stage and related code
    
    * Rename
    
    * Remove unused class
---
 .../data/pipeline/api/job/JobStatus.java           |  5 -----
 .../progress/JobOffsetInfo.java}                   | 14 +++++-------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  2 --
 .../data/pipeline/cdc/core/job/CDCJob.java         |  2 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  | 16 +------------
 .../pipeline/core/api/GovernanceRepositoryAPI.java | 16 +++++++++++++
 .../core/api/InventoryIncrementalJobAPI.java       | 17 ++++++++++++++
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 21 +++++++++++++++++
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 11 +++++++++
 .../core/job/progress/yaml/YamlJobOffsetInfo.java} | 17 ++++++--------
 .../progress/yaml/YamlJobOffsetInfoSwapper.java}   | 26 +++++++++++++---------
 .../migration/prepare/MigrationJobPreparer.java    | 19 ++++++----------
 12 files changed, 101 insertions(+), 65 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 50f7272f2fc..d31c2505715 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -37,11 +37,6 @@ public enum JobStatus {
      */
     PREPARING(true),
     
-    /**
-     * Job is in prepare success status.
-     */
-    PREPARE_SUCCESS(true),
-    
     /**
      * Job is in execute inventory task status.
      */
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java
similarity index 74%
copy from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
copy to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java
index 77e836483c7..21201168ba6 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java
@@ -15,21 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 /**
- * CDC table based pipeline job info.
+ * Job offset info.
  */
-@Getter
 @RequiredArgsConstructor
-public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String databaseName;
+@Getter
+public final class JobOffsetInfo {
     
-    private final String schemaTableNames;
+    private final boolean targetSchemaTableCreated;
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 89ba6a17672..6f644730758 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
@@ -167,7 +166,6 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                 IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
                 
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
 dumperConfig, dataSourceManager));
                 jobItemProgress.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
-                jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
                 
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
i, 
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 6479e96d100..4e10f0ca63d 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -56,7 +56,7 @@ public final class CDCJob extends AbstractSimplePipelineJob {
     
     @Override
     protected void doPrepare(final PipelineJobItemContext jobItemContext) {
-        jobPreparer.prepare((CDCJobItemContext) jobItemContext);
+        jobPreparer.initTasks((CDCJobItemContext) jobItemContext);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 8f789c6c2bf..b87f057d9ed 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -53,7 +52,7 @@ public final class CDCJobPreparer {
      *
      * @param jobItemContext job item context
      */
-    public void prepare(final CDCJobItemContext jobItemContext) {
+    public void initTasks(final CDCJobItemContext jobItemContext) {
         Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         if (!jobItemProgress.isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
@@ -62,24 +61,11 @@ public final class CDCJobPreparer {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        boolean needUpdateJobStatus = !jobItemProgress.isPresent() || 
JobStatus.PREPARING.equals(jobItemContext.getStatus()) || 
JobStatus.RUNNING.equals(jobItemContext.getStatus())
-                || 
JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
-        if (needUpdateJobStatus) {
-            updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
-        }
         initIncrementalTasks(jobItemContext);
         CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
         if (jobConfig.isFull()) {
             initInventoryTasks(jobItemContext);
         }
-        if (needUpdateJobStatus) {
-            updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);
-        }
-    }
-    
-    private void updateJobItemStatus(final JobStatus jobStatus, final 
CDCJobItemContext jobItemContext) {
-        jobItemContext.setStatus(jobStatus);
-        jobAPI.persistJobItemProgress(jobItemContext);
     }
     
     private void initInventoryTasks(final CDCJobItemContext jobItemContext) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index b65b5632713..f58bf278fb6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -39,6 +39,22 @@ public interface GovernanceRepositoryAPI {
      */
     boolean isExisted(String key);
     
+    /**
+     * Persist job offset info.
+     *
+     * @param jobId job id
+     * @param jobOffsetInfo job offset info
+     */
+    void persistJobOffsetInfo(String jobId, String jobOffsetInfo);
+    
+    /**
+     * Get job offset info.
+     *
+     * @param jobId job id
+     * @return job offset info
+     */
+    Optional<String> getJobOffsetInfo(String jobId);
+    
     /**
      * Persist job item progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 8e47caca94c..7ed1701a46f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
@@ -52,6 +53,22 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      */
     PipelineProcessConfiguration showProcessConfiguration();
     
+    /**
+     * Persist job offset info.
+     *
+     * @param jobId job id
+     * @param jobOffsetInfo job offset info.
+     */
+    void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
+    
+    /**
+     * Get job offset info.
+     *
+     * @param jobId job id
+     * @return job offset progress
+     */
+    JobOffsetInfo getJobOffsetInfo(String jobId);
+    
     /**
      * Get job progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index fa694640d81..8188d0cc8e3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
@@ -41,6 +42,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -75,6 +78,8 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
     @Getter(AccessLevel.PROTECTED)
     private final YamlInventoryIncrementalJobItemProgressSwapper 
jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
     
+    private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new 
YamlJobOffsetInfoSwapper();
+    
     protected abstract String getTargetDatabaseType(PipelineJobConfiguration 
pipelineJobConfig);
     
     @Override
@@ -158,6 +163,22 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
     }
     
+    @Override
+    public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
+        String value = 
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobOffsetInfo(jobId, 
value);
+    }
+    
+    @Override
+    public JobOffsetInfo getJobOffsetInfo(final String jobId) {
+        Optional<String> offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobOffsetInfo(jobId);
+        if (offsetInfo.isPresent()) {
+            YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), 
YamlJobOffsetInfo.class);
+            return jobOffsetInfoSwapper.swapToObject(info);
+        }
+        return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
+    }
+    
     @Override
     public Optional<InventoryIncrementalJobItemProgress> 
getJobItemProgress(final String jobId, final int shardingItem) {
         Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, 
shardingItem);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index d28a50a2519..95e695ebfe1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -55,6 +55,17 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         return null != repository.getDirectly(key);
     }
     
+    @Override
+    public void persistJobOffsetInfo(final String jobId, final String 
jobOffsetInfo) {
+        repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId), 
jobOffsetInfo);
+    }
+    
+    @Override
+    public Optional<String> getJobOffsetInfo(final String jobId) {
+        String text = 
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
+        return Strings.isNullOrEmpty(text) ? Optional.empty() : 
Optional.of(text);
+    }
+    
     @Override
     public void persistJobItemProgress(final String jobId, final int 
shardingItem, final String progressValue) {
         repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java
similarity index 69%
copy from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java
index 77e836483c7..b238f1b99e3 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java
@@ -15,21 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * CDC table based pipeline job info.
+ * Yaml job offset info.
  */
 @Getter
-@RequiredArgsConstructor
-public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
+@Setter
+public final class YamlJobOffsetInfo implements YamlConfiguration {
     
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String databaseName;
-    
-    private final String schemaTableNames;
+    private boolean targetSchemaTableCreated;
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java
similarity index 50%
rename from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java
index 77e836483c7..a8efecd574a 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
+import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
- * CDC table based pipeline job info.
+ * Yaml job offset info swapper.
  */
-@Getter
-@RequiredArgsConstructor
-public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
+public final class YamlJobOffsetInfoSwapper implements 
YamlConfigurationSwapper<YamlJobOffsetInfo, JobOffsetInfo> {
     
-    private final PipelineJobMetaData jobMetaData;
+    @Override
+    public YamlJobOffsetInfo swapToYamlConfiguration(final JobOffsetInfo data) 
{
+        YamlJobOffsetInfo result = new YamlJobOffsetInfo();
+        result.setTargetSchemaTableCreated(data.isTargetSchemaTableCreated());
+        return result;
+    }
     
-    private final String databaseName;
-    
-    private final String schemaTableNames;
+    @Override
+    public JobOffsetInfo swapToObject(final YamlJobOffsetInfo yamlConfig) {
+        return new JobOffsetInfo(yamlConfig.isTargetSchemaTableCreated());
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index bfde585bee7..40c3254150d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -51,7 +52,6 @@ import 
org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Map.Entry;
-import java.util.Optional;
 
 /**
  * Migration job preparer.
@@ -99,27 +99,22 @@ public final class MigrationJobPreparer {
         }
         LockDefinition lockDefinition = new GlobalLockDefinition(lockName);
         long startTimeMillis = System.currentTimeMillis();
-        if (lockContext.tryLock(lockDefinition, 180000)) {
+        if (lockContext.tryLock(lockDefinition, 600000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), 
System.currentTimeMillis() - startTimeMillis);
             try {
-                Optional<InventoryIncrementalJobItemProgress> jobItemProgress 
= jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
-                JobStatus currentStatus = 
jobItemProgress.map(InventoryIncrementalJobItemProgress::getStatus).orElse(null);
-                boolean prepareFlag = !jobItemProgress.isPresent() || 
JobStatus.PREPARING.equals(currentStatus) || 
JobStatus.RUNNING.equals(currentStatus)
-                        || JobStatus.PREPARING_FAILURE.equals(currentStatus);
-                if (prepareFlag) {
+                JobOffsetInfo offsetInfo = 
jobAPI.getJobOffsetInfo(jobConfig.getJobId());
+                if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobAPI.updateJobItemStatus(jobConfig.getJobId(), 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
-                    // TODO Loop insert zookeeper performance is not good
-                    for (int i = 0; i <= 
jobItemContext.getJobConfig().getJobShardingCount(); i++) {
-                        jobItemContext.setStatus(JobStatus.PREPARE_SUCCESS);
-                        jobAPI.updateJobItemStatus(jobConfig.getJobId(), i, 
JobStatus.PREPARE_SUCCESS);
-                    }
+                    jobAPI.persistJobOffsetInfo(jobConfig.getJobId(), new 
JobOffsetInfo(true));
                 }
             } finally {
                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobConfig.getJobId(), jobItemContext.getShardingItem(), 
System.currentTimeMillis() - startTimeMillis);
                 lockContext.unlock(lockDefinition);
             }
+        } else {
+            log.warn("jobId={}, shardingItem={} try lock failed", 
jobConfig.getJobId(), jobItemContext.getShardingItem());
         }
     }
     

Reply via email to