This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 703e317baca Refactor dataSourceName in job item progress structure
(#20880)
703e317baca is described below
commit 703e317baca412c32b19bf6556e2be5a7f70183d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 8 17:50:45 2022 +0800
Refactor dataSourceName in job item progress structure (#20880)
* Refactor dataSourceName in job item progress structure
* Revise 19836, Continuous Integration could run on forked repository
---
.github/workflows/ci.yml | 2 --
.../ShowMigrationJobStatusQueryResultSet.java | 2 +-
.../api/config/ingest/DumperConfiguration.java | 2 +-
.../api/context/PipelineJobItemContext.java | 7 +++++
.../InventoryIncrementalJobItemProgress.java | 2 ++
.../progress/JobItemIncrementalTasksProgress.java | 34 +++++++---------------
.../impl/InventoryIncrementalJobItemAPIImpl.java | 9 ++----
.../YamlInventoryIncrementalJobItemProgress.java | 2 ++
...InventoryIncrementalJobItemProgressSwapper.java | 2 ++
.../yaml/YamlJobItemIncrementalTasksProgress.java | 2 --
...YamlJobItemIncrementalTasksProgressSwapper.java | 23 +++++++--------
.../core/prepare/PipelineJobPreparerUtils.java | 2 +-
.../scenario/migration/MigrationJobAPIImpl.java | 2 +-
.../migration/MigrationJobItemContext.java | 3 ++
.../migration/MigrationTaskConfiguration.java | 2 ++
.../InventoryIncrementalJobItemProgressTest.java | 5 ++--
...ntoryIncrementalJobItemProgressSwapperTest.java | 6 ++--
.../test/resources/job-progress-all-finished.yaml | 2 +-
.../src/test/resources/job-progress-failure.yaml | 2 ++
.../src/test/resources/job-progress-running.yaml | 2 +-
.../src/test/resources/job-progress.yaml | 2 +-
21 files changed, 56 insertions(+), 59 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a36319f7907..c63bfd778ff 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -57,7 +57,6 @@ env:
jobs:
linux:
- if: github.repository == 'apache/shardingsphere'
name: JDK ${{ matrix.java_version }} - on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 60
@@ -88,7 +87,6 @@ jobs:
shardingsphere-example-generator:
- if: github.repository == 'apache/shardingsphere'
name: ShardingSphere example generator
runs-on: ubuntu-latest
steps:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 2aa717e9f48..4256fc45793 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -48,7 +48,7 @@ public final class ShowMigrationJobStatusQueryResultSet
implements DatabaseDistS
Collection<Object> result = new LinkedList<>();
result.add(entry.getKey());
if (null != entry.getValue()) {
-
result.add(entry.getValue().getIncremental().getDataSourceName());
+ result.add(entry.getValue().getDataSourceName());
result.add(entry.getValue().getStatus());
result.add(entry.getValue().isActive() ?
Boolean.TRUE.toString() : Boolean.FALSE.toString());
result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index e47aa2be450..f9847eae00a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -33,7 +33,7 @@ import java.util.Map;
*/
@Getter
@Setter
-@ToString(exclude = "dataSourceConfig")
+@ToString(exclude = { "dataSourceConfig", "tableNameSchemaNameMapping" })
// TODO it should be final and not extends by sub-class
// TODO fields final
public class DumperConfiguration {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
index eb4d51f7f6e..f436ef35003 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobItemContext.java
@@ -39,6 +39,13 @@ public interface PipelineJobItemContext {
*/
int getShardingItem();
+ /**
+ * Get data source name.
+ *
+ * @return data source name
+ */
+ String getDataSourceName();
+
/**
* Get job status.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index d1aceda3232..4f22da7dfaf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -32,6 +32,8 @@ public final class InventoryIncrementalJobItemProgress
implements PipelineJobIte
private String sourceDatabaseType;
+ private String dataSourceName;
+
private boolean active;
private JobItemInventoryTasksProgress inventory;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
index b4065d087f6..837832a40f8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
@@ -17,15 +17,13 @@
package org.apache.shardingsphere.data.pipeline.api.job.progress;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import java.util.Optional;
+
/**
* Job item incremental tasks progress.
*/
@@ -33,27 +31,15 @@ import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTask
@Getter
public final class JobItemIncrementalTasksProgress {
- private final Map<String, IncrementalTaskProgress>
incrementalTaskProgressMap;
+ private final IncrementalTaskProgress incrementalTaskProgress;
/**
* Get incremental position.
- *
- * @param dataSourceName data source name
- * @return incremental position
- */
- public Optional<IngestPosition<?>> getIncrementalPosition(final String
dataSourceName) {
- Optional<IncrementalTaskProgress> incrementalTaskProgress =
incrementalTaskProgressMap.entrySet().stream()
- .filter(entry ->
dataSourceName.equals(entry.getKey())).map(Map.Entry::getValue).findAny();
- return
incrementalTaskProgress.map(IncrementalTaskProgress::getPosition);
- }
-
- /**
- * Get data source name.
*
- * @return data source
+ * @return incremental position
*/
- public String getDataSourceName() {
- return
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+ public Optional<IngestPosition<?>> getIncrementalPosition() {
+ return null != incrementalTaskProgress ?
Optional.of(incrementalTaskProgress.getPosition()) : Optional.empty();
}
/**
@@ -62,9 +48,9 @@ public final class JobItemIncrementalTasksProgress {
* @return latest active time, <code>0</code> means there is no activity
*/
public long getIncrementalLatestActiveTimeMillis() {
- List<Long> delays = incrementalTaskProgressMap.values().stream()
- .map(each ->
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
- .collect(Collectors.toList());
- return delays.stream().reduce(Long::max).orElse(0L);
+ if (null == incrementalTaskProgress) {
+ return 0L;
+ }
+ return
incrementalTaskProgress.getIncrementalTaskDelay().getLatestActiveTimeMillis();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index 1d1910bdcb5..ec1302c76df 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
@@ -53,6 +52,7 @@ public final class InventoryIncrementalJobItemAPIImpl
implements PipelineJobItem
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(jobItemContext.getStatus());
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+ jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
@@ -60,11 +60,8 @@ public final class InventoryIncrementalJobItemAPIImpl
implements PipelineJobItem
}
private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<IncrementalTask> incrementalTasks) {
- Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new
HashMap<>();
- for (IncrementalTask each : incrementalTasks) {
- incrementalTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
- }
- return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
+ IncrementalTask incrementalTask = incrementalTasks.size() > 0 ?
incrementalTasks.iterator().next() : null;
+ return new JobItemIncrementalTasksProgress(null != incrementalTask ?
incrementalTask.getTaskProgress() : null);
}
private JobItemInventoryTasksProgress getInventoryTasksProgress(final
Collection<InventoryTask> inventoryTasks) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index 31fa9250eea..de052fe82ef 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -32,6 +32,8 @@ public final class YamlInventoryIncrementalJobItemProgress
implements YamlConfig
private String sourceDatabaseType;
+ private String dataSourceName;
+
private YamlJobItemInventoryTasksProgress inventory;
private YamlJobItemIncrementalTasksProgress incremental;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index b25151cbb79..6624e8bd750 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -35,6 +35,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
YamlInventoryIncrementalJobItemProgress result = new
YamlInventoryIncrementalJobItemProgress();
result.setStatus(progress.getStatus().name());
result.setSourceDatabaseType(progress.getSourceDatabaseType());
+ result.setDataSourceName(progress.getDataSourceName());
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToYaml(progress.getInventory()));
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToYaml(progress.getIncremental()));
return result;
@@ -45,6 +46,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
result.setSourceDatabaseType(yamlProgress.getSourceDatabaseType());
+ result.setDataSourceName(yamlProgress.getDataSourceName());
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToObject(yamlProgress.getInventory()));
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()));
return result;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
index c7070c10a3f..ad1edd2385c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgress.java
@@ -28,8 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTask
@Setter
public final class YamlJobItemIncrementalTasksProgress {
- private String dataSourceName;
-
private String position;
private IncrementalTaskDelay delay;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index 2452490c192..197c6ccf29a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -21,8 +21,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
-import java.util.Collections;
-
/**
* YAML job item incremental tasks progress swapper.
*/
@@ -38,15 +36,14 @@ public final class
YamlJobItemIncrementalTasksProgressSwapper {
if (null == progress) {
return new YamlJobItemIncrementalTasksProgress();
}
- return progress.getIncrementalTaskProgressMap()
- .entrySet().stream()
- .map(entry -> {
- YamlJobItemIncrementalTasksProgress result = new
YamlJobItemIncrementalTasksProgress();
- result.setDataSourceName(entry.getKey());
-
result.setPosition(entry.getValue().getPosition().toString());
-
result.setDelay(entry.getValue().getIncrementalTaskDelay());
- return result;
- }).findAny().orElse(new YamlJobItemIncrementalTasksProgress());
+ IncrementalTaskProgress incrementalTaskProgress =
progress.getIncrementalTaskProgress();
+ if (null == incrementalTaskProgress) {
+ return new YamlJobItemIncrementalTasksProgress();
+ }
+ YamlJobItemIncrementalTasksProgress result = new
YamlJobItemIncrementalTasksProgress();
+
result.setPosition(progress.getIncrementalTaskProgress().getPosition().toString());
+
result.setDelay(progress.getIncrementalTaskProgress().getIncrementalTaskDelay());
+ return result;
}
/**
@@ -58,12 +55,12 @@ public final class
YamlJobItemIncrementalTasksProgressSwapper {
*/
public JobItemIncrementalTasksProgress swapToObject(final String
databaseType, final YamlJobItemIncrementalTasksProgress yamlProgress) {
if (null == yamlProgress) {
- return new JobItemIncrementalTasksProgress(Collections.emptyMap());
+ return new JobItemIncrementalTasksProgress(null);
}
IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
// TODO databaseType
taskProgress.setPosition(PositionInitializerFactory.getInstance(databaseType).init(yamlProgress.getPosition()));
taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
- return new
JobItemIncrementalTasksProgress(Collections.singletonMap(yamlProgress.getDataSourceName(),
taskProgress));
+ return new JobItemIncrementalTasksProgress(taskProgress);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 99087c7358e..d1ea79d9981 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -128,7 +128,7 @@ public final class PipelineJobPreparerUtils {
public static IngestPosition<?> getIncrementalPosition(final
JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration
dumperConfig,
final
PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initIncremental) {
- Optional<IngestPosition<?>> position =
initIncremental.getIncrementalPosition(dumperConfig.getDataSourceName());
+ Optional<IngestPosition<?>> position =
initIncremental.getIncrementalPosition();
if (position.isPresent()) {
return position.get();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 4be428d86ed..df60ac463b9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -205,7 +205,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
// TODO now shardingColumnsMap always empty,
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
Collections.emptyMap(), tableNameSchemaNameMapping);
- MigrationTaskConfiguration result = new
MigrationTaskConfiguration(createTableConfig, dumperConfig, importerConfig);
+ MigrationTaskConfiguration result = new
MigrationTaskConfiguration(jobConfig.getSourceResourceName(),
createTableConfig, dumperConfig, importerConfig);
log.info("buildTaskConfiguration, sourceResourceName={}, result={}",
jobConfig.getSourceResourceName(), result);
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 2904932b9e0..ca19765cf46 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -49,6 +49,8 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
private final int shardingItem;
+ private final String dataSourceName;
+
private volatile boolean stopping;
private volatile JobStatus status = JobStatus.RUNNING;
@@ -88,6 +90,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
+ this.dataSourceName = taskConfig.getDataSourceName();
this.initProgress = initProgress;
this.jobProcessContext = jobProcessContext;
this.taskConfig = taskConfig;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 52672769aab..8420bbaf6f1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -33,6 +33,8 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
@ToString
public final class MigrationTaskConfiguration implements
PipelineTaskConfiguration {
+ private final String dataSourceName;
+
private final CreateTableConfiguration createTableConfig;
private final DumperConfiguration dumperConfig;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
index fcd322e009b..efc158c5ccc 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -46,13 +47,13 @@ public final class InventoryIncrementalJobItemProgressTest {
assertThat(actual.getStatus(), is(JobStatus.RUNNING));
assertThat(actual.getSourceDatabaseType(), is("H2"));
assertThat(actual.getInventory().getInventoryTaskProgressMap().size(),
is(4));
-
assertThat(actual.getIncremental().getIncrementalTaskProgressMap().size(),
is(1));
+ assertNotNull(actual.getIncremental().getIncrementalTaskProgress());
}
@Test
public void assertGetIncrementalPosition() {
InventoryIncrementalJobItemProgress actual =
getJobItemProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
- Optional<IngestPosition<?>> position =
actual.getIncremental().getIncrementalPosition("ds0");
+ Optional<IngestPosition<?>> position =
actual.getIncremental().getIncrementalPosition();
assertTrue(position.isPresent());
assertThat(position.get(), instanceOf(PlaceholderPosition.class));
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
index f00810fc2c5..ebc466c0caf 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
@@ -37,12 +37,12 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapperTest {
YamlInventoryIncrementalJobItemProgress actual =
SWAPPER.swapToYamlConfiguration(progress);
assertThat(actual.getStatus(), is("RUNNING"));
assertThat(actual.getSourceDatabaseType(), is("H2"));
+ assertThat(actual.getDataSourceName(), is("ds_0"));
assertThat(actual.getInventory().getFinished().length, is(2));
assertArrayEquals(actual.getInventory().getFinished(), new
String[]{"ds0.t_2", "ds0.t_1"});
assertThat(actual.getInventory().getUnfinished().size(), is(2));
assertThat(actual.getInventory().getUnfinished().get("ds1.t_2"),
is("i,1,2"));
assertThat(actual.getInventory().getUnfinished().get("ds1.t_1"),
is(""));
- assertThat(actual.getIncremental().getDataSourceName(), is("ds0"));
assertThat(actual.getIncremental().getPosition().length(), is(0));
}
@@ -60,7 +60,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapperTest {
assertNotNull(progress.getInventory());
assertNotNull(progress.getIncremental());
assertThat(progress.getInventory().getInventoryFinishedPercentage(),
is(0));
- assertThat(progress.getIncremental().getDataSourceName(), is(""));
+ assertThat(progress.getDataSourceName(), is("ds_0"));
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(),
is(0L));
YamlInventoryIncrementalJobItemProgress actual =
SWAPPER.swapToYamlConfiguration(progress);
assertNotNull(actual.getInventory());
@@ -75,7 +75,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapperTest {
assertNotNull(progress.getInventory());
assertNotNull(progress.getIncremental());
assertThat(progress.getInventory().getInventoryFinishedPercentage(),
is(0));
- assertThat(progress.getIncremental().getDataSourceName(), is("ds_0"));
+ assertThat(progress.getDataSourceName(), is("ds_0"));
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(),
is(0L));
YamlInventoryIncrementalJobItemProgress actual =
SWAPPER.swapToYamlConfiguration(progress);
assertNotNull(actual.getInventory());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
index 0ae217af602..8a99cbfade1 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
@@ -15,8 +15,8 @@
# limitations under the License.
#
+dataSourceName: ds_0
incremental:
- dataSourceName: ds0
delay:
lastEventTimestamps: 0
latestActiveTimeMillis: 50
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
index ce53fea4473..e3fc461f8e0 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
@@ -15,5 +15,7 @@
# limitations under the License.
#
#
+
+dataSourceName: ds_0
sourceDatabaseType: H2
status: PREPARING_FAILURE
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
index c71ae9fd49f..f904a7669a1 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-running.yaml
@@ -16,8 +16,8 @@
#
#
+dataSourceName: ds_0
incremental:
- dataSourceName: ds_0
delay:
lastEventTimestamps: 0
latestActiveTimeMillis: 0
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
index 61fe5448960..fa82043980c 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
@@ -15,8 +15,8 @@
# limitations under the License.
#
+dataSourceName: ds_0
incremental:
- dataSourceName: ds0
delay:
lastEventTimestamps: 0
latestActiveTimeMillis: 0