sandynz commented on code in PR #24080:
URL: https://github.com/apache/shardingsphere/pull/24080#discussion_r1101436707


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -127,9 +135,43 @@ public boolean createJob(final 
CreateSubscriptionJobParameter event) {
         JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
         jobConfigPOJO.setDisabled(true);
         repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+        if 
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+            initIncrementalPosition(jobConfig);
+        }
         return true;
     }
     
+    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+        if 
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+            return;
+        }
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+        String jobId = jobConfig.getJobId();
+        try {
+            for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+                if (getJobItemProgress(jobId, i).isPresent()) {
+                    continue;
+                }
+                TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+                DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
+                InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
+                
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+                
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
+                IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
+                
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
 dumperConfig, dataSourceManager));
+                jobItemProgress.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+                jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
+                
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
i, 
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+            }
+        } catch (final SQLException ex) {
+            log.error("Get incremental position failed", ex);
+            //
+            throw new RuntimeException(String.format("Get %s incremental 
position failed", jobConfig.getDatabase()));

Review Comment:
   It's better to use dedicated exception (sub-class of PipelineSQLException), 
but not RuntimeException



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java:
##########
@@ -53,26 +55,25 @@ public final class CDCJobPreparer {
      * @param jobItemContext job item context
      */
     public void prepare(final CDCJobItemContext jobItemContext) {
-        if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem()).isPresent()) {
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        if (!jobItemProgress.isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+        boolean needUpdateJobStatus = !jobItemProgress.isPresent() || 
JobStatus.PREPARING.equals(jobItemContext.getStatus()) || 
JobStatus.RUNNING.equals(jobItemContext.getStatus())
+                || 
JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
+        if (needUpdateJobStatus) {
+            jobItemContext.setStatus(JobStatus.PREPARE_SUCCESS);
+        }
         initIncrementalTasks(jobItemContext);
         CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
         if 
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
             initInventoryTasks(jobItemContext);
         }
         jobAPI.persistJobItemProgress(jobItemContext);

Review Comment:
   Could we keep `updateJobItemStatus(JobStatus.PREPARING, jobItemContext);` 
and `updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);`;
   And remove `jobAPI.persistJobItemProgress(jobItemContext);`?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -195,12 +232,16 @@ private TableNameSchemaNameMapping 
getTableNameSchemaNameMapping(final List<Stri
         return new TableNameSchemaNameMapping(tableNameSchemaMap);
     }
     
-    private static DumperConfiguration buildDumperConfiguration(final 
CDCJobConfiguration jobConfig, final String dataSourceName, final 
PipelineDataSourceConfiguration sourceDataSourceConfig,
-                                                                final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+    private static DumperConfiguration buildDumperConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+        Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
+        dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
+        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+        StandardPipelineDataSourceConfiguration actualDataSourceConfiguration 
= 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);

Review Comment:
   `actualDataSourceConfiguration` could be `actualDataSourceConfig`



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java:
##########
@@ -127,9 +135,43 @@ public boolean createJob(final 
CreateSubscriptionJobParameter event) {
         JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
         jobConfigPOJO.setDisabled(true);
         repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+        if 
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+            initIncrementalPosition(jobConfig);
+        }
         return true;
     }
     
+    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+        if 
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+            return;
+        }

Review Comment:
   Looks this block code is not required. Since 1) There's already if condition 
before invocation, 2) The FULL check doesn't filter other options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to