This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0df0325d9b0 Add CDCJobAPI (#29220)
0df0325d9b0 is described below
commit 0df0325d9b0aad185eb2b269a3bd214fb79b4d24
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 27 23:17:41 2023 +0800
Add CDCJobAPI (#29220)
* Refactor MigrationJobOption
* Refactor MigrationJobOption
* Add CDCJobAPI
* Add CDCJobAPI
---
.../core/job/option/TransmissionJobOption.java | 16 +-
.../handler/update/DropStreamingUpdater.java | 8 +-
.../api/impl/{CDCJobOption.java => CDCJobAPI.java} | 168 ++++------------
.../data/pipeline/cdc/api/impl/CDCJobOption.java | 222 ++-------------------
.../data/pipeline/cdc/core/job/CDCJob.java | 17 +-
.../pipeline/cdc/handler/CDCBackendHandler.java | 15 +-
...ta.pipeline.core.job.service.TransmissionJobAPI | 18 ++
.../migration/api/impl/MigrationJobOption.java | 45 ++---
8 files changed, 132 insertions(+), 377 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index edf23ebc525..a9a630ec205 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -47,6 +47,14 @@ public interface TransmissionJobOption extends
PipelineJobOption {
*/
PipelineJobInfo getJobInfo(String jobId);
+ /**
+ * Extend YAML job configuration.
+ *
+ * @param contextKey context key
+ * @param yamlJobConfig YAML job configuration
+ */
+ void extendYamlJobConfiguration(PipelineContextKey contextKey,
YamlPipelineJobConfiguration yamlJobConfig);
+
/**
* Build task configuration.
*
@@ -65,14 +73,6 @@ public interface TransmissionJobOption extends
PipelineJobOption {
*/
TransmissionProcessContext buildProcessContext(PipelineJobConfiguration
jobConfig);
- /**
- * Extend YAML job configuration.
- *
- * @param contextKey context key
- * @param yamlJobConfig YAML job configuration
- */
- void extendYamlJobConfiguration(PipelineContextKey contextKey,
YamlPipelineJobConfiguration yamlJobConfig);
-
/**
* Build pipeline data consistency checker.
*
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
index 87a6a73050e..3befad8d811 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
@@ -18,8 +18,10 @@
package org.apache.shardingsphere.cdc.distsql.handler.update;
import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.sql.SQLException;
@@ -28,11 +30,11 @@ import java.sql.SQLException;
*/
public final class DropStreamingUpdater implements
RALUpdater<DropStreamingStatement> {
- private final CDCJobOption jobAPI = new CDCJobOption();
+ private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
@Override
public void executeUpdate(final String databaseName, final
DropStreamingStatement sqlStatement) throws SQLException {
- jobAPI.dropStreaming(sqlStatement.getJobId());
+ jobAPI.drop(sqlStatement.getJobId());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
similarity index 64%
copy from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
copy to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 48f6bf3bb61..ef06ee45dcf 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -17,46 +17,28 @@
package org.apache.shardingsphere.data.pipeline.cdc.api.impl;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -66,12 +48,11 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -87,20 +68,18 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
import java.sql.SQLException;
import java.time.LocalDateTime;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import java.util.stream.Collectors;
/**
- * CDC job option.
+ * CDC job API.
*/
@Slf4j
-public final class CDCJobOption implements TransmissionJobOption {
+public final class CDCJobAPI implements TransmissionJobAPI {
private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper =
new YamlDataSourceConfigurationSwapper();
@@ -108,26 +87,28 @@ public final class CDCJobOption implements
TransmissionJobOption {
private final YamlPipelineDataSourceConfigurationSwapper
pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
+ private final CDCJobOption jobOption = new CDCJobOption();
+
/**
- * Create CDC job config.
+ * Create CDC job.
*
- * @param param create CDC job param
+ * @param param CDC job parameter
* @param sinkType sink type
* @param sinkProps sink properties
* @return job id
*/
- public String createJob(final StreamDataParameter param, final CDCSinkType
sinkType, final Properties sinkProps) {
+ public String create(final StreamDataParameter param, final CDCSinkType
sinkType, final Properties sinkProps) {
PipelineContextKey contextKey = new
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
YamlCDCJobConfiguration yamlJobConfig =
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
- extendYamlJobConfiguration(contextKey, yamlJobConfig);
+ jobOption.extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
if
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
{
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
-
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
getJobClass());
- JobConfigurationPOJO jobConfigPOJO = new
PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
+
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
jobOption.getJobClass());
+ JobConfigurationPOJO jobConfigPOJO = new
PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
@@ -171,7 +152,7 @@ public final class CDCJobOption implements
TransmissionJobOption {
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
try (PipelineDataSourceManager pipelineDataSourceManager = new
DefaultPipelineDataSourceManager()) {
for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
if (jobItemManager.getProgress(jobId, i).isPresent()) {
@@ -180,13 +161,22 @@ public final class CDCJobOption implements
TransmissionJobOption {
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
TransmissionJobItemProgress jobItemProgress =
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
- jobId, i,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ jobId, i,
YamlEngine.marshal(jobOption.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
}
}
+ private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+ String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+ StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
+ return new IncrementalDumperContext(
+ new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
+ jobConfig.getJobId(), jobConfig.isDecodeWithTX());
+ }
+
private static TransmissionJobItemProgress
getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
final PipelineDataSourceManager dataSourceManager,
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
@@ -199,118 +189,51 @@ public final class CDCJobOption implements
TransmissionJobOption {
}
/**
- * Start job.
+ * Start CDC job.
*
* @param jobId job id
* @param sink sink
*/
- public void startJob(final String jobId, final PipelineSink sink) {
+ public void start(final String jobId, final PipelineSink sink) {
CDCJob job = new CDCJob(jobId, sink);
PipelineJobCenter.addJob(jobId, job);
- updateJobConfigurationDisabled(jobId, false);
+ enable(jobId);
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
+ private void enable(final String jobId) {
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+ jobConfigPOJO.setDisabled(false);
+ jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
+ jobConfigPOJO.getProps().remove("stop_time_millis");
+
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
+ }
+
/**
- * Update job configuration disabled.
+ * Disable CDC job.
*
* @param jobId job id
- * @param disabled disabled
*/
- public void updateJobConfigurationDisabled(final String jobId, final
boolean disabled) {
+ public void disable(final String jobId) {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- jobConfigPOJO.setDisabled(disabled);
- if (disabled) {
- jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
- jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
- } else {
- jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
- jobConfigPOJO.getProps().remove("stop_time_millis");
- }
+ jobConfigPOJO.setDisabled(true);
+ jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
+ jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
}
- @Override
- public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
- YamlCDCJobConfiguration config = (YamlCDCJobConfiguration)
yamlJobConfig;
- if (null == yamlJobConfig.getJobId()) {
- config.setJobId(new CDCJobId(contextKey,
config.getSchemaTableNames(), config.isFull(),
config.getSinkConfig().getSinkType()).marshal());
- }
- if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
- PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
- config.getDataSourceConfiguration().getParameter());
-
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
- }
- }
-
- @Override
- public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
- TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
- IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
- CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext,
importerConfig);
- log.debug("buildTaskConfiguration, result={}", result);
- return result;
- }
-
- private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
- String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
- StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- return new IncrementalDumperContext(
- new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
- jobConfig.getJobId(), jobConfig.isDecodeWithTX());
- }
-
- private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
- final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
- jobConfig.getDataSourceConfig().getParameter());
- CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
-
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
- }
-
- @Override
- public CDCProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
- TransmissionJobManager jobManager = new TransmissionJobManager(this);
- return new CDCProcessContext(jobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
- return new YamlCDCJobConfigurationSwapper();
- }
-
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
- return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
- }
-
- @Override
- public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
- return true;
- }
-
/**
- * Drop streaming job.
+ * Drop CDC job.
*
* @param jobId job id
*/
- public void dropStreaming(final String jobId) {
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
+ public void drop(final String jobId) {
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
() -> new PipelineInternalException("Can't drop streaming job which is
active"));
- new PipelineJobManager(this).drop(jobId);
+ new PipelineJobManager(jobOption).drop(jobId);
cleanup(jobConfig);
}
@@ -325,14 +248,11 @@ public final class CDCJobOption implements
TransmissionJobOption {
}
@Override
- public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
- final
ConsistencyCheckJobItemProgressContext progressContext) {
- throw new UnsupportedOperationException();
+ public void commit(final String jobId) throws SQLException {
}
@Override
- public Class<CDCJob> getJobClass() {
- return CDCJob.class;
+ public void rollback(final String jobId) throws SQLException {
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
index 48f6bf3bb61..5408034eaac 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
@@ -20,79 +20,40 @@ package
org.apache.shardingsphere.data.pipeline.cdc.api.impl;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import java.sql.SQLException;
-import java.time.LocalDateTime;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -102,135 +63,27 @@ import java.util.stream.Collectors;
@Slf4j
public final class CDCJobOption implements TransmissionJobOption {
- private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper =
new YamlDataSourceConfigurationSwapper();
-
- private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine =
new YamlRuleConfigurationSwapperEngine();
-
- private final YamlPipelineDataSourceConfigurationSwapper
pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
-
- /**
- * Create CDC job config.
- *
- * @param param create CDC job param
- * @param sinkType sink type
- * @param sinkProps sink properties
- * @return job id
- */
- public String createJob(final StreamDataParameter param, final CDCSinkType
sinkType, final Properties sinkProps) {
- PipelineContextKey contextKey = new
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
- YamlCDCJobConfiguration yamlJobConfig =
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
- extendYamlJobConfiguration(contextKey, yamlJobConfig);
- CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
- ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- if
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
{
- log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
- } else {
-
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
getJobClass());
- JobConfigurationPOJO jobConfigPOJO = new
PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
- jobConfigPOJO.setDisabled(true);
-
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
- if (!param.isFull()) {
- initIncrementalPosition(jobConfig);
- }
- }
- return jobConfig.getJobId();
- }
-
- private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final
StreamDataParameter param, final CDCSinkType sinkType, final Properties
sinkProps, final PipelineContextKey contextKey) {
- YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
- result.setDatabaseName(param.getDatabaseName());
- result.setSchemaTableNames(param.getSchemaTableNames());
- result.setFull(param.isFull());
- result.setDecodeWithTX(param.isDecodeWithTX());
- YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
- sinkConfig.setSinkType(sinkType.name());
- sinkConfig.setProps(sinkProps);
- result.setSinkConfig(sinkConfig);
- ShardingSphereDatabase database =
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
-
result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
- List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
-
result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- JobDataNodeLine tableFirstDataNodes = new
JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
- .map(each -> new JobDataNodeEntry(each.getKey(),
each.getValue().subList(0, 1))).collect(Collectors.toList()));
- result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
- return result;
- }
-
- private ShardingSpherePipelineDataSourceConfiguration
getDataSourceConfiguration(final ShardingSphereDatabase database) {
- Map<String, Map<String, Object>> dataSourcePoolProps = new HashMap<>();
- for (Entry<String, StorageUnit> entry :
database.getResourceMetaData().getStorageUnits().entrySet()) {
- dataSourcePoolProps.put(entry.getKey(),
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
- }
- YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
- targetRootConfig.setDatabaseName(database.getName());
- targetRootConfig.setDataSources(dataSourcePoolProps);
-
targetRootConfig.setRules(ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()));
- return new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
- }
-
- private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
- String jobId = jobConfig.getJobId();
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
- try (PipelineDataSourceManager pipelineDataSourceManager = new
DefaultPipelineDataSourceManager()) {
- for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
- if (jobItemManager.getProgress(jobId, i).isPresent()) {
- continue;
- }
- IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
- TransmissionJobItemProgress jobItemProgress =
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
- jobId, i,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
- }
- } catch (final SQLException ex) {
- throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
+ return new YamlCDCJobConfigurationSwapper();
}
- private static TransmissionJobItemProgress
getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
-
final PipelineDataSourceManager dataSourceManager,
-
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
- TransmissionJobItemProgress result = new TransmissionJobItemProgress();
- result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
incrementalDumperContext, dataSourceManager));
- result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
- return result;
+ @Override
+ public Class<CDCJob> getJobClass() {
+ return CDCJob.class;
}
- /**
- * Start job.
- *
- * @param jobId job id
- * @param sink sink
- */
- public void startJob(final String jobId, final PipelineSink sink) {
- CDCJob job = new CDCJob(jobId, sink);
- PipelineJobCenter.addJob(jobId, job);
- updateJobConfigurationDisabled(jobId, false);
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
- job.setJobBootstrap(oneOffJobBootstrap);
- oneOffJobBootstrap.execute();
+ @Override
+ public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+ return true;
}
- /**
- * Update job configuration disabled.
- *
- * @param jobId job id
- * @param disabled disabled
- */
- public void updateJobConfigurationDisabled(final String jobId, final
boolean disabled) {
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- jobConfigPOJO.setDisabled(disabled);
- if (disabled) {
- jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
- jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
- } else {
- jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
- jobConfigPOJO.getProps().remove("stop_time_millis");
- }
-
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
+ @Override
+ public PipelineJobInfo getJobInfo(final String jobId) {
+ PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
+ return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
@Override
@@ -284,57 +137,12 @@ public final class CDCJobOption implements
TransmissionJobOption {
return new CDCProcessContext(jobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}
- @SuppressWarnings("unchecked")
- @Override
- public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
- return new YamlCDCJobConfigurationSwapper();
- }
-
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
- return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
- }
-
- @Override
- public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
- return true;
- }
-
- /**
- * Drop streaming job.
- *
- * @param jobId job id
- */
- public void dropStreaming(final String jobId) {
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
-
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
() -> new PipelineInternalException("Can't drop streaming job which is
active"));
- new PipelineJobManager(this).drop(jobId);
- cleanup(jobConfig);
- }
-
- private void cleanup(final CDCJobConfiguration jobConfig) {
- for (Entry<String, Map<String, Object>> entry :
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
- try {
- PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(),
new StandardPipelineDataSourceConfiguration(entry.getValue()));
- } catch (final SQLException ex) {
- log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
- }
- }
- }
-
@Override
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
throw new UnsupportedOperationException();
}
- @Override
- public Class<CDCJob> getJobClass() {
- return CDCJob.class;
- }
-
@Override
public String getType() {
return "STREAMING";
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 79d53ea16f6..7f477377486 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.job;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
@@ -44,10 +45,12 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
@@ -65,9 +68,11 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
@Getter
private final PipelineSink sink;
- private final CDCJobOption jobAPI = new CDCJobOption();
+ private final CDCJobOption jobOption = new CDCJobOption();
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+ private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@@ -109,8 +114,8 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private CDCJobItemContext buildPipelineJobItemContext(final
CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
- CDCProcessContext jobProcessContext =
jobAPI.buildProcessContext(jobConfig);
- CDCTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ CDCProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
+ CDCTaskConfiguration taskConfig =
jobOption.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
}
@@ -131,7 +136,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
PipelineJobCenter.stop(jobId);
- jobAPI.updateJobConfigurationDisabled(jobId, true);
+ jobAPI.disable(jobId);
}
private void executeInventoryTasks(final List<CDCJobItemContext>
jobItemContexts) {
@@ -212,7 +217,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
}
PipelineJobCenter.stop(jobId);
- jobAPI.updateJobConfigurationDisabled(jobId, true);
+ jobAPI.disable(jobId);
}
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 849ab9a61c0..a5079896438 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
@@ -48,12 +49,14 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Collection;
import java.util.HashSet;
@@ -70,9 +73,9 @@ import java.util.stream.Collectors;
@Slf4j
public final class CDCBackendHandler {
- private final CDCJobOption jobAPI = new CDCJobOption();
+ private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(jobAPI);
+ private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(new CDCJobOption());
/**
* Get database name by job ID.
@@ -115,7 +118,7 @@ public final class CDCBackendHandler {
ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(),
() -> new PipelineInvalidParameterException(String.format("Not find table %s",
tableNames)));
boolean decodeWithTx = database.getProtocolType() instanceof
OpenGaussDatabaseType;
StreamDataParameter parameter = new
StreamDataParameter(requestBody.getDatabase(), new
LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap,
decodeWithTx);
- String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new
Properties());
+ String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new
Properties());
connectionContext.setJobId(jobId);
startStreaming(jobId, connectionContext, channel);
return CDCResponseUtils.succeed(requestId,
ResponseCase.STREAM_DATA_RESULT,
StreamDataResult.newBuilder().setStreamingId(jobId).build());
@@ -135,7 +138,7 @@ public final class CDCBackendHandler {
PipelineJobCenter.stop(jobId);
}
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
- jobAPI.startJob(jobId, new CDCSocketSink(channel, database,
cdcJobConfig.getSchemaTableNames()));
+ jobAPI.start(jobId, new CDCSocketSink(channel, database,
cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);
}
@@ -157,7 +160,7 @@ public final class CDCBackendHandler {
if (job.getSink().identifierMatched(channelId)) {
log.info("close CDC job, channel id: {}", channelId);
PipelineJobCenter.stop(jobId);
- jobAPI.updateJobConfigurationDisabled(jobId, true);
+ jobAPI.disable(jobId);
}
}
@@ -167,7 +170,7 @@ public final class CDCBackendHandler {
* @param jobId job ID
*/
public void dropStreaming(final String jobId) {
- jobAPI.dropStreaming(jobId);
+ jobAPI.drop(jobId);
}
/**
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI
new file mode 100644
index 00000000000..b500cdc9f58
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
index c62eb313f46..c98f837421d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
@@ -58,7 +58,6 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -70,10 +69,31 @@ import java.util.stream.Collectors;
@Slf4j
public final class MigrationJobOption implements TransmissionJobOption {
+ @SuppressWarnings("unchecked")
+ @Override
+ public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
+ return new YamlMigrationJobConfigurationSwapper();
+ }
+
+ @Override
+ public Class<MigrationJob> getJobClass() {
+ return MigrationJob.class;
+ }
+
+ @Override
+ public Optional<String> getToBeStartDisabledNextJobType() {
+ return Optional.of("CONSISTENCY_CHECK");
+ }
+
+ @Override
+ public Optional<String> getToBeStoppedPreviousJobType() {
+ return Optional.of("CONSISTENCY_CHECK");
+ }
+
@Override
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- List<String> sourceTables = new LinkedList<>();
+ Collection<String> sourceTables = new LinkedList<>();
new
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
@@ -87,12 +107,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
}
}
- @SuppressWarnings("unchecked")
- @Override
- public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
- return new YamlMigrationJobConfigurationSwapper();
- }
-
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
@@ -149,21 +163,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
jobConfig, processContext, progressContext);
}
- @Override
- public Optional<String> getToBeStartDisabledNextJobType() {
- return Optional.of("CONSISTENCY_CHECK");
- }
-
- @Override
- public Optional<String> getToBeStoppedPreviousJobType() {
- return Optional.of("CONSISTENCY_CHECK");
- }
-
- @Override
- public Class<MigrationJob> getJobClass() {
- return MigrationJob.class;
- }
-
@Override
public String getType() {
return "MIGRATION";