This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7d25f0b59 Fix sonar issue of CDCJobAPI (#25659)
0d7d25f0b59 is described below

commit 0d7d25f0b59133eb9e4282366c8a0ea1e16cefdb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 14 20:47:45 2023 +0800

    Fix sonar issue of CDCJobAPI (#25659)
---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 158 +++++++++++----------
 .../cdc/config/job/CDCJobConfiguration.java        |   2 +-
 2 files changed, 85 insertions(+), 75 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 5ffd47a2619..97931179bdf 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -119,66 +119,45 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
      * @return job id
      */
     public String createJob(final StreamDataParameter param, final CDCSinkType 
sinkType, final Properties sinkProps) {
-        YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
-        yamlJobConfig.setDatabaseName(param.getDatabaseName());
-        yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
-        yamlJobConfig.setFull(param.isFull());
-        yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
-        YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
-        sinkConfig.setSinkType(sinkType.name());
-        sinkConfig.setProps(sinkProps);
-        yamlJobConfig.setSinkConfig(sinkConfig);
         PipelineContextKey contextKey = 
PipelineContextKey.buildForProxy(param.getDatabaseName());
-        ShardingSphereDatabase database = 
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
-        
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
-        List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
-        
yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        JobDataNodeLine tableFirstDataNodes = new 
JobDataNodeLine(param.getDataNodesMap().entrySet().stream().map(each -> new 
JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
-                .collect(Collectors.toList()));
-        yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
+        YamlCDCJobConfiguration yamlJobConfig = 
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
         extendYamlJobConfiguration(contextKey, yamlJobConfig);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
-        String jobId = jobConfig.getJobId();
-        ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
-        String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
+        ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
+        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        String jobConfigKey = 
PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
         if (repositoryAPI.isExisted(jobConfigKey)) {
             log.warn("CDC job already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
-            return jobId;
-        }
-        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
getJobClassName());
-        JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
-        jobConfigPOJO.setDisabled(true);
-        repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
-        if (!param.isFull()) {
-            initIncrementalPosition(jobConfig);
+        } else {
+            
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getJobClassName());
+            JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
+            jobConfigPOJO.setDisabled(true);
+            repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfigPOJO));
+            if (!param.isFull()) {
+                initIncrementalPosition(jobConfig);
+            }
         }
-        return jobId;
+        return jobConfig.getJobId();
     }
     
-    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
-        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-        String jobId = jobConfig.getJobId();
-        try {
-            for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
-                if (getJobItemProgress(jobId, i).isPresent()) {
-                    continue;
-                }
-                TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
-                DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
-                InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
-                
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-                
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
-                IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
dumperConfig, dataSourceManager));
-                jobItemProgress.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
-                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
 i,
-                        
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
-            }
-        } catch (final SQLException ex) {
-            throw new 
PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
-        } finally {
-            dataSourceManager.close();
-        }
+    private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final 
StreamDataParameter param, final CDCSinkType sinkType, final Properties 
sinkProps, final PipelineContextKey contextKey) {
+        YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
+        result.setDatabaseName(param.getDatabaseName());
+        result.setSchemaTableNames(param.getSchemaTableNames());
+        result.setFull(param.isFull());
+        result.setDecodeWithTX(param.isDecodeWithTX());
+        YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
+        sinkConfig.setSinkType(sinkType.name());
+        sinkConfig.setProps(sinkProps);
+        result.setSinkConfig(sinkConfig);
+        ShardingSphereDatabase database = 
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
+        
result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
+        List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
+        
result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
+        JobDataNodeLine tableFirstDataNodes = new 
JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
+                .map(each -> new JobDataNodeEntry(each.getKey(), 
each.getValue().subList(0, 1))).collect(Collectors.toList()));
+        result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
+        return result;
     }
     
     private ShardingSpherePipelineDataSourceConfiguration 
getDataSourceConfiguration(final ShardingSphereDatabase database) {
@@ -194,35 +173,41 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
     }
     
-    @Override
-    public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
-        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) 
yamlJobConfig;
-        if (null == yamlJobConfig.getJobId()) {
-            config.setJobId(generateJobId(contextKey, config));
-        }
-        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
-                    config.getDataSourceConfiguration().getParameter());
-            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+        String jobId = jobConfig.getJobId();
+        try {
+            for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+                if (getJobItemProgress(jobId, i).isPresent()) {
+                    continue;
+                }
+                DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig, i, 
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
+                InventoryIncrementalJobItemProgress jobItemProgress = 
getInventoryIncrementalJobItemProgress(jobConfig, dataSourceManager, 
dumperConfig);
+                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
+                        jobId, i, 
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+            }
+        } catch (final SQLException ex) {
+            throw new 
PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
+        } finally {
+            dataSourceManager.close();
         }
     }
     
-    private String generateJobId(final PipelineContextKey contextKey, final 
YamlCDCJobConfiguration config) {
-        CDCJobId jobId = new CDCJobId(contextKey, 
config.getSchemaTableNames(), config.isFull(), 
config.getSinkConfig().getSinkType());
-        return marshalJobId(jobId);
-    }
-    
-    @Override
-    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
-        CDCJobId jobId = (CDCJobId) pipelineJobId;
-        String text = 
Joiner.on('|').join(jobId.getContextKey().getDatabaseName(), 
jobId.getSchemaTableNames(), jobId.isFull());
-        return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
+    private static InventoryIncrementalJobItemProgress 
getInventoryIncrementalJobItemProgress(final CDCJobConfiguration jobConfig,
+                                                                               
               final PipelineDataSourceManager dataSourceManager,
+                                                                               
               final DumperConfiguration dumperConfig) throws SQLException {
+        InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
+        result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+        result.setDataSourceName(dumperConfig.getDataSourceName());
+        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
dumperConfig, dataSourceManager));
+        result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+        return result;
     }
     
     /**
      * Start job.
      *
-     * @param jobId             job id
+     * @param jobId job id
      * @param importerConnector importer connector
      */
     public void startJob(final String jobId, final ImporterConnector 
importerConnector) {
@@ -238,7 +223,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     /**
      * Update job configuration disabled.
      *
-     * @param jobId    job id
+     * @param jobId job id
      * @param disabled disabled
      */
     public void updateJobConfigurationDisabled(final String jobId, final 
boolean disabled) {
@@ -253,6 +238,31 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
     }
     
+    @Override
+    public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
+        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) 
yamlJobConfig;
+        if (null == yamlJobConfig.getJobId()) {
+            config.setJobId(generateJobId(contextKey, config));
+        }
+        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
+                    config.getDataSourceConfiguration().getParameter());
+            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+        }
+    }
+    
+    private String generateJobId(final PipelineContextKey contextKey, final 
YamlCDCJobConfiguration config) {
+        CDCJobId jobId = new CDCJobId(contextKey, 
config.getSchemaTableNames(), config.isFull(), 
config.getSinkConfig().getSinkType());
+        return marshalJobId(jobId);
+    }
+    
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+        CDCJobId jobId = (CDCJobId) pipelineJobId;
+        String text = 
Joiner.on('|').join(jobId.getContextKey().getDatabaseName(), 
jobId.getSchemaTableNames(), jobId.isFull());
+        return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
+    }
+    
     @Override
     public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 8df7a2783b6..fa0b79c341c 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -30,8 +30,8 @@ import java.util.Properties;
 /**
  * CDC job configuration.
  */
-@Getter
 @RequiredArgsConstructor
+@Getter
 public final class CDCJobConfiguration implements PipelineJobConfiguration {
     
     private final String jobId;

Reply via email to