This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cb0cf03f9cc Revert MigrationJobId.jobShardingDataNodes (#29286)
cb0cf03f9cc is described below
commit cb0cf03f9cc53ca78222fb645561218041d2f56d
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 5 10:30:14 2023 +0800
Revert MigrationJobId.jobShardingDataNodes (#29286)
---
.../data/pipeline/scenario/migration/MigrationJobId.java | 4 ++++
.../data/pipeline/scenario/migration/api/MigrationJobAPI.java | 2 +-
.../shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java | 4 +++-
.../test/it/data/pipeline/core/util/JobConfigurationBuilder.java | 2 +-
4 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index b0a13d30437..aad7a1a9a98 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -23,6 +23,8 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import java.util.List;
+
/**
* Migration job id.
*/
@@ -33,4 +35,6 @@ public final class MigrationJobId implements PipelineJobId {
private final PipelineJobType jobType = new MigrationJobType();
private final PipelineContextKey contextKey;
+
+ private final List<String> jobShardingDataNodes;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 75d1bf91c5e..96f36ac11ae 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -162,7 +162,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- result.setJobId(PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey)));
+ result.setJobId(PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey, result.getJobShardingDataNodes())));
return result;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index f7c7bcf31bb..8407e00e3e1 100644
---
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -23,6 +23,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,7 +40,7 @@ class PipelineJobIdUtilsTest {
private void assertParse0(final InstanceType instanceType) {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
instanceType);
- String jobId = PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey));
+ String jobId = PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey,
Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1")));
assertThat(PipelineJobIdUtils.parseJobType(jobId),
instanceOf(MigrationJobType.class));
PipelineContextKey actualContextKey =
PipelineJobIdUtils.parseContextKey(jobId);
assertThat(actualContextKey.getInstanceType(), is(instanceType));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 8a63105604c..80be58b02d9 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -77,7 +77,7 @@ public final class JobConfigurationBuilder {
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
PipelineContextKey contextKey = new
PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
- result.setJobId(PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey)));
+ result.setJobId(PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey, result.getJobShardingDataNodes())));
Map<String, YamlPipelineDataSourceConfiguration> sources = new
LinkedHashMap<>();
String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
PipelineDataSourceConfiguration sourceDataSourceConfig = new
StandardPipelineDataSourceConfiguration(