This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 725b0647eaa Refactor CDCJobAPI (#32482)
725b0647eaa is described below

commit 725b0647eaa7a1f7edb66f4eeab224f0e7fd0e69
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 13 05:20:09 2024 +0800

    Refactor CDCJobAPI (#32482)
    
    * Refactor CDCJobAPI
    
    * Refactor CDCJobAPI
---
 .../data/pipeline/cdc/api/CDCJobAPI.java              | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index c51301bac87..bb56972dcfe 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -201,8 +201,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
         String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        return new IncrementalDumperContext(
-                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
+        return new IncrementalDumperContext(new 
DumperCommonContext(dataSourceName, actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
                 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
     }
     
@@ -281,19 +280,15 @@ public final class CDCJobAPI implements 
TransmissionJobAPI {
      * @param jobId job id
      * @return job item infos
      */
-    public List<CDCJobItemInfo> getJobItemInfos(final String jobId) {
+    public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
         CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
-        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(jobType).getJobItemInfos(jobId);
-        List<CDCJobItemInfo> result = new LinkedList<>();
-        for (TransmissionJobItemInfo each : jobItemInfos) {
+        Collection<CDCJobItemInfo> result = new LinkedList<>();
+        for (TransmissionJobItemInfo each : new 
TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
             TransmissionJobItemProgress jobItemProgress = 
each.getJobItemProgress();
-            if (null == jobItemProgress) {
-                result.add(new CDCJobItemInfo(each, "", ""));
-                continue;
-            }
-            result.add(new CDCJobItemInfo(each, 
jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse(""),
-                    getCurrentPosition(database, 
jobItemProgress.getDataSourceName())));
+            String confirmedPosition = null == jobItemProgress ? "" : 
jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse("");
+            String currentPosition = null == jobItemProgress ? "" : 
getCurrentPosition(database, jobItemProgress.getDataSourceName());
+            result.add(new CDCJobItemInfo(each, confirmedPosition, 
currentPosition));
         }
         return result;
     }

Reply via email to