sandynz commented on code in PR #24080:
URL: https://github.com/apache/shardingsphere/pull/24080#discussion_r1101436707
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -127,9 +135,43 @@ public boolean createJob(final
CreateSubscriptionJobParameter event) {
JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+ if
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+ initIncrementalPosition(jobConfig);
+ }
return true;
}
+ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+ if
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+ return;
+ }
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ String jobId = jobConfig.getJobId();
+ try {
+ for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+ if (getJobItemProgress(jobId, i).isPresent()) {
+ continue;
+ }
+ TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+ DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
+ InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
+
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress();
+
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
+ jobItemProgress.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+ jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ }
+ } catch (final SQLException ex) {
+ log.error("Get incremental position failed", ex);
+ //
+ throw new RuntimeException(String.format("Get %s incremental
position failed", jobConfig.getDatabase()));
Review Comment:
It's better to use dedicated exception (sub-class of PipelineSQLException),
but not RuntimeException
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java:
##########
@@ -53,26 +55,25 @@ public final class CDCJobPreparer {
* @param jobItemContext job item context
*/
public void prepare(final CDCJobItemContext jobItemContext) {
- if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem()).isPresent()) {
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ if (!jobItemProgress.isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+ boolean needUpdateJobStatus = !jobItemProgress.isPresent() ||
JobStatus.PREPARING.equals(jobItemContext.getStatus()) ||
JobStatus.RUNNING.equals(jobItemContext.getStatus())
+ ||
JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
+ if (needUpdateJobStatus) {
+ jobItemContext.setStatus(JobStatus.PREPARE_SUCCESS);
+ }
initIncrementalTasks(jobItemContext);
CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
if
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
initInventoryTasks(jobItemContext);
}
jobAPI.persistJobItemProgress(jobItemContext);
Review Comment:
Could we keep `updateJobItemStatus(JobStatus.PREPARING, jobItemContext);`
and `updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);`;
And remove `jobAPI.persistJobItemProgress(jobItemContext);`?
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -195,12 +232,16 @@ private TableNameSchemaNameMapping
getTableNameSchemaNameMapping(final List<Stri
return new TableNameSchemaNameMapping(tableNameSchemaMap);
}
- private static DumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final String dataSourceName, final
PipelineDataSourceConfiguration sourceDataSourceConfig,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ private static DumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+ Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
+ dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
+ String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+ StandardPipelineDataSourceConfiguration actualDataSourceConfiguration
=
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
Review Comment:
`actualDataSourceConfiguration` could be `actualDataSourceConfig`
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -127,9 +135,43 @@ public boolean createJob(final
CreateSubscriptionJobParameter event) {
JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+ if
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+ initIncrementalPosition(jobConfig);
+ }
return true;
}
+ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+ if
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+ return;
+ }
Review Comment:
Looks this block code is not required. Since 1) There's already if condition
before invocation, 2) The FULL check doesn't filter other options.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]