This is an automated email from the ASF dual-hosted git repository. zhonghongsheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 09cb7af7fec Refactor pipeline job prepare stage and related code (#24425) 09cb7af7fec is described below commit 09cb7af7fec5d867e63c9ce65ccef5681d878419 Author: Xinze Guo <101622833+aze...@users.noreply.github.com> AuthorDate: Thu Mar 2 19:58:52 2023 +0800 Refactor pipeline job prepare stage and related code (#24425) * Refactor pipeline job prepare stage and related code * Rename * Remove unused class --- .../data/pipeline/api/job/JobStatus.java | 5 ----- .../progress/JobOffsetInfo.java} | 14 +++++------- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 -- .../data/pipeline/cdc/core/job/CDCJob.java | 2 +- .../pipeline/cdc/core/prepare/CDCJobPreparer.java | 16 +------------ .../pipeline/core/api/GovernanceRepositoryAPI.java | 16 +++++++++++++ .../core/api/InventoryIncrementalJobAPI.java | 17 ++++++++++++++ .../AbstractInventoryIncrementalJobAPIImpl.java | 21 +++++++++++++++++ .../core/api/impl/GovernanceRepositoryAPIImpl.java | 11 +++++++++ .../core/job/progress/yaml/YamlJobOffsetInfo.java} | 17 ++++++-------- .../progress/yaml/YamlJobOffsetInfoSwapper.java} | 26 +++++++++++++--------- .../migration/prepare/MigrationJobPreparer.java | 19 ++++++---------- 12 files changed, 101 insertions(+), 65 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java index 50f7272f2fc..d31c2505715 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java @@ -37,11 +37,6 @@ public enum JobStatus { */ PREPARING(true), - /** - * Job is in prepare success status. - */ - PREPARE_SUCCESS(true), - /** * Job is in execute inventory task status. */ diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java similarity index 74% copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java index 77e836483c7..21201168ba6 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java @@ -15,21 +15,17 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.pojo; +package org.apache.shardingsphere.data.pipeline.api.job.progress; import lombok.Getter; import lombok.RequiredArgsConstructor; /** - * CDC table based pipeline job info. + * Job offset info. */ -@Getter @RequiredArgsConstructor -public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo { - - private final PipelineJobMetaData jobMetaData; - - private final String databaseName; +@Getter +public final class JobOffsetInfo { - private final String schemaTableNames; + private final boolean targetSchemaTableCreated; } diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 89ba6a17672..6f644730758 100644 --- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -36,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; -import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress; @@ -167,7 +166,6 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl { IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(); incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null, dumperConfig, dataSourceManager)); jobItemProgress.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); - jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS); PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } } catch (final SQLException ex) { diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 6479e96d100..4e10f0ca63d 100644 --- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -56,7 +56,7 @@ public final class CDCJob extends AbstractSimplePipelineJob { @Override protected void doPrepare(final PipelineJobItemContext jobItemContext) { - jobPreparer.prepare((CDCJobItemContext) jobItemContext); + jobPreparer.initTasks((CDCJobItemContext) jobItemContext); } @Override diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 8f789c6c2bf..b87f057d9ed 100644 --- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.prepare; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; -import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; @@ -53,7 +52,7 @@ public final class CDCJobPreparer { * * @param jobItemContext job item context */ - public void prepare(final CDCJobItemContext jobItemContext) { + public void initTasks(final CDCJobItemContext jobItemContext) { Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); if (!jobItemProgress.isPresent()) { jobAPI.persistJobItemProgress(jobItemContext); @@ -62,24 +61,11 @@ public final class CDCJobPreparer { PipelineJobCenter.stop(jobItemContext.getJobId()); return; } - boolean needUpdateJobStatus = !jobItemProgress.isPresent() || JobStatus.PREPARING.equals(jobItemContext.getStatus()) || JobStatus.RUNNING.equals(jobItemContext.getStatus()) - || JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus()); - if (needUpdateJobStatus) { - updateJobItemStatus(JobStatus.PREPARING, jobItemContext); - } initIncrementalTasks(jobItemContext); CDCJobConfiguration jobConfig = jobItemContext.getJobConfig(); if (jobConfig.isFull()) { initInventoryTasks(jobItemContext); } - if (needUpdateJobStatus) { - updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext); - } - } - - private void updateJobItemStatus(final JobStatus jobStatus, final CDCJobItemContext jobItemContext) { - jobItemContext.setStatus(jobStatus); - jobAPI.persistJobItemProgress(jobItemContext); } private void initInventoryTasks(final CDCJobItemContext jobItemContext) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java index b65b5632713..f58bf278fb6 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java @@ -39,6 +39,22 @@ public interface GovernanceRepositoryAPI { */ boolean isExisted(String key); + /** + * Persist job offset info. + * + * @param jobId job id + * @param jobOffsetInfo job offset info + */ + void persistJobOffsetInfo(String jobId, String jobOffsetInfo); + + /** + * Get job offset info. + * + * @param jobId job id + * @return job offset info + */ + Optional<String> getJobOffsetInfo(String jobId); + /** * Persist job item progress. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java index 8e47caca94c..7ed1701a46f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext; @@ -52,6 +53,22 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI { */ PipelineProcessConfiguration showProcessConfiguration(); + /** + * Persist job offset info. + * + * @param jobId job id + * @param jobOffsetInfo job offset info. + */ + void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo); + + /** + * Get job offset info. + * + * @param jobId job id + * @return job offset progress + */ + JobOffsetInfo getJobOffsetInfo(String jobId); + /** * Get job progress. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java index fa694640d81..8188d0cc8e3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; @@ -41,6 +42,8 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfo; +import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfoSwapper; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask; import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm; @@ -75,6 +78,8 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip @Getter(AccessLevel.PROTECTED) private final YamlInventoryIncrementalJobItemProgressSwapper jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper(); + private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new YamlJobOffsetInfoSwapper(); + protected abstract String getTargetDatabaseType(PipelineJobConfiguration pipelineJobConfig); @Override @@ -158,6 +163,22 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip return new JobItemInventoryTasksProgress(inventoryTaskProgressMap); } + @Override + public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) { + String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo)); + PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobOffsetInfo(jobId, value); + } + + @Override + public JobOffsetInfo getJobOffsetInfo(final String jobId) { + Optional<String> offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobOffsetInfo(jobId); + if (offsetInfo.isPresent()) { + YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class); + return jobOffsetInfoSwapper.swapToObject(info); + } + return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo()); + } + @Override public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) { Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java index d28a50a2519..95e695ebfe1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java @@ -55,6 +55,17 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP return null != repository.getDirectly(key); } + @Override + public void persistJobOffsetInfo(final String jobId, final String jobOffsetInfo) { + repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId), jobOffsetInfo); + } + + @Override + public Optional<String> getJobOffsetInfo(final String jobId) { + String text = repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId)); + return Strings.isNullOrEmpty(text) ? Optional.empty() : Optional.of(text); + } + @Override public void persistJobItemProgress(final String jobId, final int shardingItem, final String progressValue) { repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem), progressValue); diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java similarity index 69% copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java index 77e836483c7..b238f1b99e3 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java @@ -15,21 +15,18 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.pojo; +package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; import lombok.Getter; -import lombok.RequiredArgsConstructor; +import lombok.Setter; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; /** - * CDC table based pipeline job info. + * Yaml job offset info. */ @Getter -@RequiredArgsConstructor -public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo { +@Setter +public final class YamlJobOffsetInfo implements YamlConfiguration { - private final PipelineJobMetaData jobMetaData; - - private final String databaseName; - - private final String schemaTableNames; + private boolean targetSchemaTableCreated; } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java similarity index 50% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java index 77e836483c7..a8efecd574a 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java @@ -15,21 +15,25 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.pojo; +package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; -import lombok.Getter; -import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo; +import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; /** - * CDC table based pipeline job info. + * Yaml job offset info swapper. */ -@Getter -@RequiredArgsConstructor -public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo { +public final class YamlJobOffsetInfoSwapper implements YamlConfigurationSwapper<YamlJobOffsetInfo, JobOffsetInfo> { - private final PipelineJobMetaData jobMetaData; + @Override + public YamlJobOffsetInfo swapToYamlConfiguration(final JobOffsetInfo data) { + YamlJobOffsetInfo result = new YamlJobOffsetInfo(); + result.setTargetSchemaTableCreated(data.isTargetSchemaTableCreated()); + return result; + } - private final String databaseName; - - private final String schemaTableNames; + @Override + public JobOffsetInfo swapToObject(final YamlJobOffsetInfo yamlConfig) { + return new JobOffsetInfo(yamlConfig.isTargetSchemaTableCreated()); + } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index bfde585bee7..40c3254150d 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException; @@ -51,7 +52,6 @@ import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; import java.sql.SQLException; import java.util.Collections; import java.util.Map.Entry; -import java.util.Optional; /** * Migration job preparer. @@ -99,27 +99,22 @@ public final class MigrationJobPreparer { } LockDefinition lockDefinition = new GlobalLockDefinition(lockName); long startTimeMillis = System.currentTimeMillis(); - if (lockContext.tryLock(lockDefinition, 180000)) { + if (lockContext.tryLock(lockDefinition, 600000)) { log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); try { - Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); - JobStatus currentStatus = jobItemProgress.map(InventoryIncrementalJobItemProgress::getStatus).orElse(null); - boolean prepareFlag = !jobItemProgress.isPresent() || JobStatus.PREPARING.equals(currentStatus) || JobStatus.RUNNING.equals(currentStatus) - || JobStatus.PREPARING_FAILURE.equals(currentStatus); - if (prepareFlag) { + JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobConfig.getJobId()); + if (!offsetInfo.isTargetSchemaTableCreated()) { jobItemContext.setStatus(JobStatus.PREPARING); jobAPI.updateJobItemStatus(jobConfig.getJobId(), jobItemContext.getShardingItem(), JobStatus.PREPARING); prepareAndCheckTarget(jobItemContext); - // TODO Loop insert zookeeper performance is not good - for (int i = 0; i <= jobItemContext.getJobConfig().getJobShardingCount(); i++) { - jobItemContext.setStatus(JobStatus.PREPARE_SUCCESS); - jobAPI.updateJobItemStatus(jobConfig.getJobId(), i, JobStatus.PREPARE_SUCCESS); - } + jobAPI.persistJobOffsetInfo(jobConfig.getJobId(), new JobOffsetInfo(true)); } } finally { log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); lockContext.unlock(lockDefinition); } + } else { + log.warn("jobId={}, shardingItem={} try lock failed", jobConfig.getJobId(), jobItemContext.getShardingItem()); } }