This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1f107c752cd Improve CDC pure incremental mode (#24080)
1f107c752cd is described below
commit 1f107c752cdc6988a4cd72fd9610d29937a61441
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Feb 10 17:29:12 2023 +0800
Improve CDC pure incremental mode (#24080)
* Improve pure incremental mode
* Fix codestyle
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 79 ++++++++++++++++------
.../cdc/context/job/CDCJobItemContext.java | 16 ++++-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 16 +++--
.../AbstractInventoryIncrementalJobAPIImpl.java | 5 +-
4 files changed, 88 insertions(+), 28 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 5708a485a4d..bee9d5c08f6 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -30,15 +30,20 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJ
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
@@ -46,6 +51,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfigurat
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -54,8 +60,11 @@ import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIn
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import
org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtil;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
@@ -97,21 +106,21 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
/**
* Create CDC job config.
*
- * @param event create CDC job event
- * @return job id
+ * @param param create CDC job param
+ * @return true if job is not exist, otherwise false
*/
- public boolean createJob(final CreateSubscriptionJobParameter event) {
+ public boolean createJob(final CreateSubscriptionJobParameter param) {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
- yamlJobConfig.setDatabase(event.getDatabase());
- yamlJobConfig.setTableNames(event.getSubscribeTableNames());
- yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
- yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
- yamlJobConfig.setDecodeWithTX(event.isDecodeWithTX());
- ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
+ yamlJobConfig.setDatabase(param.getDatabase());
+ yamlJobConfig.setTableNames(param.getSubscribeTableNames());
+ yamlJobConfig.setSubscriptionName(param.getSubscriptionName());
+ yamlJobConfig.setSubscriptionMode(param.getSubscriptionMode());
+ yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
+ ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
- List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtil.convertDataNodesToLines(event.getDataNodesMap());
+ List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtil.convertDataNodesToLines(param.getDataNodesMap());
yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- JobDataNodeLine tableFirstDataNodes = new
JobDataNodeLine(event.getDataNodesMap().entrySet().stream().map(each -> new
JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
+ JobDataNodeLine tableFirstDataNodes = new
JobDataNodeLine(param.getDataNodesMap().entrySet().stream().map(each -> new
JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
.collect(Collectors.toList()));
yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
extendYamlJobConfiguration(yamlJobConfig);
@@ -127,9 +136,38 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+ if
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+ initIncrementalPosition(jobConfig);
+ }
return true;
}
+ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ String jobId = jobConfig.getJobId();
+ try {
+ for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+ if (getJobItemProgress(jobId, i).isPresent()) {
+ continue;
+ }
+ TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+ DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
+ InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
+
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress();
+
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
+ jobItemProgress.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+ jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ }
+ } catch (final SQLException ex) {
+ throw new
PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
+ } finally {
+ dataSourceManager.close();
+ }
+ }
+
private ShardingSpherePipelineDataSourceConfiguration
getDataSourceConfiguration(final ShardingSphereDatabase database) {
Map<String, Map<String, Object>> dataSourceProps = new HashMap<>();
for (Entry<String, DataSource> entry :
database.getResourceMetaData().getDataSources().entrySet()) {
@@ -171,13 +209,8 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
@Override
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
- JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
- Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
- dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getTableNames());
- String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
- StandardPipelineDataSourceConfiguration actualDataSourceConfiguration
=
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig,
dataSourceName, actualDataSourceConfiguration, tableNameMap,
tableNameSchemaNameMapping);
+ DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig,
jobShardingItem, tableNameSchemaNameMapping);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getTableNames(), tableNameSchemaNameMapping);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig,
importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
@@ -195,12 +228,16 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return new TableNameSchemaNameMapping(tableNameSchemaMap);
}
- private static DumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final String dataSourceName, final
PipelineDataSourceConfiguration sourceDataSourceConfig,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ private static DumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+ Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
+ dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
+ String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+ StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
DumperConfiguration result = new DumperConfiguration();
result.setJobId(jobConfig.getJobId());
result.setDataSourceName(dataSourceName);
- result.setDataSourceConfig(sourceDataSourceConfig);
+ result.setDataSourceConfig(actualDataSourceConfig);
result.setTableNameMap(tableNameMap);
result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
result.setDecodeWithTX(jobConfig.isDecodeWithTX());
@@ -252,7 +289,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
@Override
public void commit(final String jobId) {
- // TODO to be implemented
+ throw new UnsupportedOperationException();
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index 8d63276264e..f59fd9eb76b 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.cdc.context.job;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
@@ -46,7 +45,6 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* CDC job item context.
*/
-@RequiredArgsConstructor
@Getter
public final class CDCJobItemContext implements
InventoryIncrementalJobItemContext {
@@ -94,6 +92,20 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
}
};
+ public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int
shardingItem, final InventoryIncrementalJobItemProgress initProgress, final
CDCProcessContext jobProcessContext,
+ final CDCTaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager, final ImporterConnector
importerConnector) {
+ this.jobConfig = jobConfig;
+ this.shardingItem = shardingItem;
+ this.initProgress = initProgress;
+ this.jobProcessContext = jobProcessContext;
+ this.taskConfig = taskConfig;
+ this.dataSourceManager = dataSourceManager;
+ this.importerConnector = importerConnector;
+ if (null != initProgress) {
+ status = initProgress.getStatus();
+ }
+ }
+
@Override
public String getJobId() {
return jobConfig.getJobId();
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 334572c1542..cbcee655918 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
@@ -38,6 +39,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import java.sql.SQLException;
import java.util.List;
+import java.util.Optional;
/**
* CDC job preparer.
@@ -53,21 +55,27 @@ public final class CDCJobPreparer {
* @param jobItemContext job item context
*/
public void prepare(final CDCJobItemContext jobItemContext) {
- if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent()) {
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ if (!jobItemProgress.isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+ boolean needUpdateJobStatus = !jobItemProgress.isPresent() ||
JobStatus.PREPARING.equals(jobItemContext.getStatus()) ||
JobStatus.RUNNING.equals(jobItemContext.getStatus())
+ ||
JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
+ if (needUpdateJobStatus) {
+ updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+ }
initIncrementalTasks(jobItemContext);
CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
if
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
initInventoryTasks(jobItemContext);
}
- jobAPI.persistJobItemProgress(jobItemContext);
- updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);
+ if (needUpdateJobStatus) {
+ updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);
+ }
}
private void updateJobItemStatus(final JobStatus jobStatus, final
CDCJobItemContext jobItemContext) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 24c05731da1..c95f79f5357 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
+import lombok.AccessLevel;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
@@ -43,10 +45,10 @@ import
org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -70,6 +72,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+ @Getter(AccessLevel.PROTECTED)
private final YamlInventoryIncrementalJobItemProgressSwapper
jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
protected abstract String getTargetDatabaseType(PipelineJobConfiguration
pipelineJobConfig);