This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 703e317baca Refactor dataSourceName in job item progress structure 
(#20880)
703e317baca is described below

commit 703e317baca412c32b19bf6556e2be5a7f70183d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 8 17:50:45 2022 +0800

    Refactor dataSourceName in job item progress structure (#20880)
    
    * Refactor dataSourceName in job item progress structure
    
    * Revise 19836, Continuous Integration could run on forked repository
---
 .github/workflows/ci.yml                           |  2 --
 .../ShowMigrationJobStatusQueryResultSet.java      |  2 +-
 .../api/config/ingest/DumperConfiguration.java     |  2 +-
 .../api/context/PipelineJobItemContext.java        |  7 +++++
 .../InventoryIncrementalJobItemProgress.java       |  2 ++
 .../progress/JobItemIncrementalTasksProgress.java  | 34 +++++++---------------
 .../impl/InventoryIncrementalJobItemAPIImpl.java   |  9 ++----
 .../YamlInventoryIncrementalJobItemProgress.java   |  2 ++
 ...InventoryIncrementalJobItemProgressSwapper.java |  2 ++
 .../yaml/YamlJobItemIncrementalTasksProgress.java  |  2 --
 ...YamlJobItemIncrementalTasksProgressSwapper.java | 23 +++++++--------
 .../core/prepare/PipelineJobPreparerUtils.java     |  2 +-
 .../scenario/migration/MigrationJobAPIImpl.java    |  2 +-
 .../migration/MigrationJobItemContext.java         |  3 ++
 .../migration/MigrationTaskConfiguration.java      |  2 ++
 .../InventoryIncrementalJobItemProgressTest.java   |  5 ++--
 ...ntoryIncrementalJobItemProgressSwapperTest.java |  6 ++--
 .../test/resources/job-progress-all-finished.yaml  |  2 +-
 .../src/test/resources/job-progress-failure.yaml   |  2 ++
 .../src/test/resources/job-progress-running.yaml   |  2 +-
 .../src/test/resources/job-progress.yaml           |  2 +-
 21 files changed, 56 insertions(+), 59 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a36319f7907..c63bfd778ff 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -57,7 +57,6 @@ env:
 
 jobs:
   linux:
-    if: github.repository == 'apache/shardingsphere'
     name: JDK ${{ matrix.java_version }} - on ${{ matrix.os }}
     runs-on: ${{ matrix.os }}
     timeout-minutes: 60
@@ -88,7 +87,6 @@ jobs:
   
   
   shardingsphere-example-generator:
-    if: github.repository == 'apache/shardingsphere'
     name: ShardingSphere example generator
     runs-on: ubuntu-latest
     steps:
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 2aa717e9f48..4256fc45793 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -48,7 +48,7 @@ public final class ShowMigrationJobStatusQueryResultSet 
implements DatabaseDistS
                     Collection<Object> result = new LinkedList<>();
                     result.add(entry.getKey());
                     if (null != entry.getValue()) {
-                        
result.add(entry.getValue().getIncremental().getDataSourceName());
+                        result.add(entry.getValue().getDataSourceName());
                         result.add(entry.getValue().getStatus());
                         result.add(entry.getValue().isActive() ? 
Boolean.TRUE.toString() : Boolean.FALSE.toString());
                         
result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index e47aa2be450..f9847eae00a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -33,7 +33,7 @@ import java.util.Map;
  */
 @Getter
 @Setter
-@ToString(exclude = "dataSourceConfig")
+@ToString(exclude = { "dataSourceConfig", "tableNameSchemaNameMapping" })
 // TODO it should be final and not extends by sub-class
 // TODO fields final
 public class DumperConfiguration {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
index eb4d51f7f6e..f436ef35003 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
@@ -39,6 +39,13 @@ public interface PipelineJobItemContext {
      */
     int getShardingItem();
     
+    /**
+     * Get data source name.
+     *
+     * @return data source name
+     */
+    String getDataSourceName();
+    
     /**
      * Get job status.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index d1aceda3232..4f22da7dfaf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -32,6 +32,8 @@ public final class InventoryIncrementalJobItemProgress 
implements PipelineJobIte
     
     private String sourceDatabaseType;
     
+    private String dataSourceName;
+    
     private boolean active;
     
     private JobItemInventoryTasksProgress inventory;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
index b4065d087f6..837832a40f8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
@@ -17,15 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 
+import java.util.Optional;
+
 /**
  * Job item incremental tasks progress.
  */
@@ -33,27 +31,15 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTask
 @Getter
 public final class JobItemIncrementalTasksProgress {
     
-    private final Map<String, IncrementalTaskProgress> 
incrementalTaskProgressMap;
+    private final IncrementalTaskProgress incrementalTaskProgress;
     
     /**
      * Get incremental position.
-     * 
-     * @param dataSourceName data source name
-     * @return incremental position
-     */
-    public Optional<IngestPosition<?>> getIncrementalPosition(final String 
dataSourceName) {
-        Optional<IncrementalTaskProgress> incrementalTaskProgress = 
incrementalTaskProgressMap.entrySet().stream()
-                .filter(entry -> 
dataSourceName.equals(entry.getKey())).map(Map.Entry::getValue).findAny();
-        return 
incrementalTaskProgress.map(IncrementalTaskProgress::getPosition);
-    }
-    
-    /**
-     * Get data source name.
      *
-     * @return data source
+     * @return incremental position
      */
-    public String getDataSourceName() {
-        return 
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+    public Optional<IngestPosition<?>> getIncrementalPosition() {
+        return null != incrementalTaskProgress ? 
Optional.of(incrementalTaskProgress.getPosition()) : Optional.empty();
     }
     
     /**
@@ -62,9 +48,9 @@ public final class JobItemIncrementalTasksProgress {
      * @return latest active time, <code>0</code> means there is no activity
      */
     public long getIncrementalLatestActiveTimeMillis() {
-        List<Long> delays = incrementalTaskProgressMap.values().stream()
-                .map(each -> 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
-                .collect(Collectors.toList());
-        return delays.stream().reduce(Long::max).orElse(0L);
+        if (null == incrementalTaskProgress) {
+            return 0L;
+        }
+        return 
incrementalTaskProgress.getIncrementalTaskDelay().getLatestActiveTimeMillis();
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index 1d1910bdcb5..ec1302c76df 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
@@ -53,6 +52,7 @@ public final class InventoryIncrementalJobItemAPIImpl 
implements PipelineJobItem
         InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
         jobItemProgress.setStatus(jobItemContext.getStatus());
         
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+        jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
         
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
         
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
         String value = 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
@@ -60,11 +60,8 @@ public final class InventoryIncrementalJobItemAPIImpl 
implements PipelineJobItem
     }
     
     private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<IncrementalTask> incrementalTasks) {
-        Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new 
HashMap<>();
-        for (IncrementalTask each : incrementalTasks) {
-            incrementalTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
-        }
-        return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
+        IncrementalTask incrementalTask = incrementalTasks.size() > 0 ? 
incrementalTasks.iterator().next() : null;
+        return new JobItemIncrementalTasksProgress(null != incrementalTask ? 
incrementalTask.getTaskProgress() : null);
     }
     
     private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
Collection<InventoryTask> inventoryTasks) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index 31fa9250eea..de052fe82ef 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -32,6 +32,8 @@ public final class YamlInventoryIncrementalJobItemProgress 
implements YamlConfig
     
     private String sourceDatabaseType;
     
+    private String dataSourceName;
+    
     private YamlJobItemInventoryTasksProgress inventory;
     
     private YamlJobItemIncrementalTasksProgress incremental;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index b25151cbb79..6624e8bd750 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -35,6 +35,7 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         YamlInventoryIncrementalJobItemProgress result = new 
YamlInventoryIncrementalJobItemProgress();
         result.setStatus(progress.getStatus().name());
         result.setSourceDatabaseType(progress.getSourceDatabaseType());
+        result.setDataSourceName(progress.getDataSourceName());
         
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToYaml(progress.getInventory()));
         
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToYaml(progress.getIncremental()));
         return result;
@@ -45,6 +46,7 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
         result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
         result.setSourceDatabaseType(yamlProgress.getSourceDatabaseType());
+        result.setDataSourceName(yamlProgress.getDataSourceName());
         
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToObject(yamlProgress.getInventory()));
         
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()));
         return result;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
index c7070c10a3f..ad1edd2385c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
@@ -28,8 +28,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTask
 @Setter
 public final class YamlJobItemIncrementalTasksProgress {
     
-    private String dataSourceName;
-    
     private String position;
     
     private IncrementalTaskDelay delay;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index 2452490c192..197c6ccf29a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -21,8 +21,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
 
-import java.util.Collections;
-
 /**
  * YAML job item incremental tasks progress swapper.
  */
@@ -38,15 +36,14 @@ public final class 
YamlJobItemIncrementalTasksProgressSwapper {
         if (null == progress) {
             return new YamlJobItemIncrementalTasksProgress();
         }
-        return progress.getIncrementalTaskProgressMap()
-                .entrySet().stream()
-                .map(entry -> {
-                    YamlJobItemIncrementalTasksProgress result = new 
YamlJobItemIncrementalTasksProgress();
-                    result.setDataSourceName(entry.getKey());
-                    
result.setPosition(entry.getValue().getPosition().toString());
-                    
result.setDelay(entry.getValue().getIncrementalTaskDelay());
-                    return result;
-                }).findAny().orElse(new YamlJobItemIncrementalTasksProgress());
+        IncrementalTaskProgress incrementalTaskProgress = 
progress.getIncrementalTaskProgress();
+        if (null == incrementalTaskProgress) {
+            return new YamlJobItemIncrementalTasksProgress();
+        }
+        YamlJobItemIncrementalTasksProgress result = new 
YamlJobItemIncrementalTasksProgress();
+        
result.setPosition(progress.getIncrementalTaskProgress().getPosition().toString());
+        
result.setDelay(progress.getIncrementalTaskProgress().getIncrementalTaskDelay());
+        return result;
     }
     
     /**
@@ -58,12 +55,12 @@ public final class 
YamlJobItemIncrementalTasksProgressSwapper {
      */
     public JobItemIncrementalTasksProgress swapToObject(final String 
databaseType, final YamlJobItemIncrementalTasksProgress yamlProgress) {
         if (null == yamlProgress) {
-            return new JobItemIncrementalTasksProgress(Collections.emptyMap());
+            return new JobItemIncrementalTasksProgress(null);
         }
         IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
         // TODO databaseType
         
taskProgress.setPosition(PositionInitializerFactory.getInstance(databaseType).init(yamlProgress.getPosition()));
         taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
-        return new 
JobItemIncrementalTasksProgress(Collections.singletonMap(yamlProgress.getDataSourceName(),
 taskProgress));
+        return new JobItemIncrementalTasksProgress(taskProgress);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 99087c7358e..d1ea79d9981 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -128,7 +128,7 @@ public final class PipelineJobPreparerUtils {
     public static IngestPosition<?> getIncrementalPosition(final 
JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration 
dumperConfig,
                                                            final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
         if (null != initIncremental) {
-            Optional<IngestPosition<?>> position = 
initIncremental.getIncrementalPosition(dumperConfig.getDataSourceName());
+            Optional<IngestPosition<?>> position = 
initIncremental.getIncrementalPosition();
             if (position.isPresent()) {
                 return position.get();
             }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 4be428d86ed..df60ac463b9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -205,7 +205,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig.getJobId(), 
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, 
tableNameSchemaNameMapping);
         // TODO now shardingColumnsMap always empty,
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
Collections.emptyMap(), tableNameSchemaNameMapping);
-        MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(createTableConfig, dumperConfig, importerConfig);
+        MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(jobConfig.getSourceResourceName(), 
createTableConfig, dumperConfig, importerConfig);
         log.info("buildTaskConfiguration, sourceResourceName={}, result={}", 
jobConfig.getSourceResourceName(), result);
         return result;
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 2904932b9e0..ca19765cf46 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -49,6 +49,8 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
     
     private final int shardingItem;
     
+    private final String dataSourceName;
+    
     private volatile boolean stopping;
     
     private volatile JobStatus status = JobStatus.RUNNING;
@@ -88,6 +90,7 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
+        this.dataSourceName = taskConfig.getDataSourceName();
         this.initProgress = initProgress;
         this.jobProcessContext = jobProcessContext;
         this.taskConfig = taskConfig;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 52672769aab..8420bbaf6f1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -33,6 +33,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 @ToString
 public final class MigrationTaskConfiguration implements 
PipelineTaskConfiguration {
     
+    private final String dataSourceName;
+    
     private final CreateTableConfiguration createTableConfig;
     
     private final DumperConfiguration dumperConfig;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
index fcd322e009b..efc158c5ccc 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -46,13 +47,13 @@ public final class InventoryIncrementalJobItemProgressTest {
         assertThat(actual.getStatus(), is(JobStatus.RUNNING));
         assertThat(actual.getSourceDatabaseType(), is("H2"));
         assertThat(actual.getInventory().getInventoryTaskProgressMap().size(), 
is(4));
-        
assertThat(actual.getIncremental().getIncrementalTaskProgressMap().size(), 
is(1));
+        assertNotNull(actual.getIncremental().getIncrementalTaskProgress());
     }
     
     @Test
     public void assertGetIncrementalPosition() {
         InventoryIncrementalJobItemProgress actual = 
getJobItemProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
-        Optional<IngestPosition<?>> position = 
actual.getIncremental().getIncrementalPosition("ds0");
+        Optional<IngestPosition<?>> position = 
actual.getIncremental().getIncrementalPosition();
         assertTrue(position.isPresent());
         assertThat(position.get(), instanceOf(PlaceholderPosition.class));
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
index f00810fc2c5..ebc466c0caf 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
@@ -37,12 +37,12 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapperTest {
         YamlInventoryIncrementalJobItemProgress actual = 
SWAPPER.swapToYamlConfiguration(progress);
         assertThat(actual.getStatus(), is("RUNNING"));
         assertThat(actual.getSourceDatabaseType(), is("H2"));
+        assertThat(actual.getDataSourceName(), is("ds_0"));
         assertThat(actual.getInventory().getFinished().length, is(2));
         assertArrayEquals(actual.getInventory().getFinished(), new 
String[]{"ds0.t_2", "ds0.t_1"});
         assertThat(actual.getInventory().getUnfinished().size(), is(2));
         assertThat(actual.getInventory().getUnfinished().get("ds1.t_2"), 
is("i,1,2"));
         assertThat(actual.getInventory().getUnfinished().get("ds1.t_1"), 
is(""));
-        assertThat(actual.getIncremental().getDataSourceName(), is("ds0"));
         assertThat(actual.getIncremental().getPosition().length(), is(0));
     }
     
@@ -60,7 +60,7 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapperTest {
         assertNotNull(progress.getInventory());
         assertNotNull(progress.getIncremental());
         assertThat(progress.getInventory().getInventoryFinishedPercentage(), 
is(0));
-        assertThat(progress.getIncremental().getDataSourceName(), is(""));
+        assertThat(progress.getDataSourceName(), is("ds_0"));
         
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), 
is(0L));
         YamlInventoryIncrementalJobItemProgress actual = 
SWAPPER.swapToYamlConfiguration(progress);
         assertNotNull(actual.getInventory());
@@ -75,7 +75,7 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapperTest {
         assertNotNull(progress.getInventory());
         assertNotNull(progress.getIncremental());
         assertThat(progress.getInventory().getInventoryFinishedPercentage(), 
is(0));
-        assertThat(progress.getIncremental().getDataSourceName(), is("ds_0"));
+        assertThat(progress.getDataSourceName(), is("ds_0"));
         
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), 
is(0L));
         YamlInventoryIncrementalJobItemProgress actual = 
SWAPPER.swapToYamlConfiguration(progress);
         assertNotNull(actual.getInventory());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
index 0ae217af602..8a99cbfade1 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
@@ -15,8 +15,8 @@
 # limitations under the License.
 #
 
+dataSourceName: ds_0
 incremental:
-  dataSourceName: ds0
   delay:
     lastEventTimestamps: 0
     latestActiveTimeMillis: 50
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
index ce53fea4473..e3fc461f8e0 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
@@ -15,5 +15,7 @@
 # limitations under the License.
 #
 #
+
+dataSourceName: ds_0
 sourceDatabaseType: H2
 status: PREPARING_FAILURE
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
index c71ae9fd49f..f904a7669a1 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
@@ -16,8 +16,8 @@
 #
 #
 
+dataSourceName: ds_0
 incremental:
-  dataSourceName: ds_0
   delay:
     lastEventTimestamps: 0
     latestActiveTimeMillis: 0
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
index 61fe5448960..fa82043980c 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
@@ -15,8 +15,8 @@
 # limitations under the License.
 #
 
+dataSourceName: ds_0
 incremental:
-  dataSourceName: ds0
   delay:
     lastEventTimestamps: 0
     latestActiveTimeMillis: 0

Reply via email to