This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 3022ab5cf1d Rename MigrateSourceTargetEntry (#36104) 3022ab5cf1d is described below commit 3022ab5cf1d0ba6389ce772233faa593b77647f7 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Tue Jul 29 15:12:19 2025 +0800 Rename MigrateSourceTargetEntry (#36104) * Rename MigrateSourceTargetEntry * Rename MigrateSourceTargetEntry --- .../data/pipeline/scenario/migration/api/MigrationJobAPI.java | 11 ++++++----- .../distsql/handler/update/MigrateTableExecutor.java | 8 ++++---- .../{SourceTargetEntry.java => MigrateSourceTargetEntry.java} | 8 ++++---- .../pipeline/scenario/migration/api/MigrationJobAPITest.java | 10 +++++----- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java index 9e7d586864b..992b42e8f7f 100644 --- a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java @@ -44,7 +44,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper; -import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry; +import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.MigrateSourceTargetEntry; import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties; import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser; @@ -115,21 +115,22 @@ public final class MigrationJobAPI implements TransmissionJobAPI { * @param targetDatabaseName target database name * @return job id */ - public String schedule(final PipelineContextKey contextKey, final Collection<SourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) { + public String schedule(final PipelineContextKey contextKey, final Collection<MigrateSourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) { MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, sourceTargetEntries, targetDatabaseName)); jobManager.start(jobConfig); return jobConfig.getJobId(); } - private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final Collection<SourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) { + private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, + final Collection<MigrateSourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) { YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration(); result.setTargetDatabaseName(targetDatabaseName); Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION"); Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>(sourceTargetEntries.size(), 1F); Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>(sourceTargetEntries.size(), 1F); YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); - for (SourceTargetEntry each : new HashSet<>(sourceTargetEntries).stream() - .sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName).thenComparing(each -> each.getSource().format())).collect(Collectors.toList())) { + for (MigrateSourceTargetEntry each : new HashSet<>(sourceTargetEntries).stream() + .sorted(Comparator.comparing(MigrateSourceTargetEntry::getTargetTableName).thenComparing(each -> each.getSource().format())).collect(Collectors.toList())) { sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource()); ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(), () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName())); diff --git a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java index 4c3aa415a80..ccc84101e56 100644 --- a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java +++ b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequire import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment; -import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry; +import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.MigrateSourceTargetEntry; import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement; import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware; import org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor; @@ -56,12 +56,12 @@ public final class MigrateTableExecutor implements DistSQLUpdateExecutor<Migrate jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY), getSourceTargetEntries(sqlStatement), targetDatabaseName); } - private Collection<SourceTargetEntry> getSourceTargetEntries(final MigrateTableStatement sqlStatement) { - Collection<SourceTargetEntry> result = new LinkedList<>(); + private Collection<MigrateSourceTargetEntry> getSourceTargetEntries(final MigrateTableStatement sqlStatement) { + Collection<MigrateSourceTargetEntry> result = new LinkedList<>(); for (MigrationSourceTargetSegment each : sqlStatement.getSourceTargetEntries()) { DataNode dataNode = new DataNode(each.getSourceDatabaseName(), each.getSourceTableName()); dataNode.setSchemaName(each.getSourceSchemaName()); - result.add(new SourceTargetEntry(each.getTargetDatabaseName(), dataNode, each.getTargetTableName())); + result.add(new MigrateSourceTargetEntry(dataNode, each.getTargetDatabaseName(), each.getTargetTableName())); } return result; } diff --git a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/SourceTargetEntry.java b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/MigrateSourceTargetEntry.java similarity index 94% rename from kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/SourceTargetEntry.java rename to kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/MigrateSourceTargetEntry.java index a0e16ccaaf7..a8511f2c069 100644 --- a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/SourceTargetEntry.java +++ b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/MigrateSourceTargetEntry.java @@ -23,16 +23,16 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.datanode.DataNode; /** - * Source target entry. + * Migrate source target entry. */ @RequiredArgsConstructor @Getter @EqualsAndHashCode(of = {"source", "targetTableName"}) -public final class SourceTargetEntry { - - private final String targetDatabaseName; +public final class MigrateSourceTargetEntry { private final DataNode source; + private final String targetDatabaseName; + private final String targetTableName; } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java index 2a6d87d9107..9cbe3cd6e7f 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java @@ -48,7 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarr import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; -import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry; +import org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.MigrateSourceTargetEntry; import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -258,21 +258,21 @@ class MigrationJobAPITest { @Test void assertCreateJobConfigFailedOnMoreThanOneSourceTable() { - Collection<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1") - .map(each -> new SourceTargetEntry("logic_db", new DataNode("ds_0", each), "t_order")).collect(Collectors.toList()); + Collection<MigrateSourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1") + .map(each -> new MigrateSourceTargetEntry(new DataNode("ds_0", each), "logic_db", "t_order")).collect(Collectors.toList()); assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, "logic_db")); } @Test void assertCreateJobConfigFailedOnDataSourceNotExist() { - Collection<SourceTargetEntry> sourceTargetEntries = Collections.singleton(new SourceTargetEntry("logic_db", new DataNode("ds_not_exists", "t_order"), "t_order")); + Collection<MigrateSourceTargetEntry> sourceTargetEntries = Collections.singleton(new MigrateSourceTargetEntry(new DataNode("ds_not_exists", "t_order"), "logic_db", "t_order")); assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, "logic_db")); } @Test void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); - SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); + MigrateSourceTargetEntry sourceTargetEntry = new MigrateSourceTargetEntry(new DataNode("ds_0", "t_order"), "logic_db", "t_order"); String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(), Collections.singleton(sourceTargetEntry), "logic_db"); MigrationJobConfiguration actual = jobConfigManager.getJobConfiguration(jobId); assertThat(actual.getTargetDatabaseName(), is("logic_db"));