This is an automated email from the ASF dual-hosted git repository. panjuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new a66b486 Separate sharding related code from scaling-core module part 3 and refactor job configuration structure (#14045) a66b486 is described below commit a66b486f7cea7819516f16d779e1d6de3bf88306 Author: Hongsheng Zhong <sand...@126.com> AuthorDate: Sat Dec 11 10:55:53 2021 +0800 Separate sharding related code from scaling-core module part 3 and refactor job configuration structure (#14045) --- .../ShardingSphereJDBCDataSourceConfiguration.java | 12 +++ .../typed/StandardJDBCDataSourceConfiguration.java | 30 ++++++-- .../typed/TypedDataSourceConfiguration.java | 18 +++++ .../shardingsphere/infra/datanode/DataNode.java | 19 +++++ .../infra/datanode/DataNodeTest.java | 14 ++++ .../core/datasource/DataSourceManager.java | 3 +- .../shardingsphere-data-pipeline-spi/pom.xml | 2 +- .../migration/common/api/ScalingWorker.java | 1 + .../job/preparer/AbstractDataSourcePreparer.java | 66 ++--------------- .../common/spi/RuleJobConfigurationPreparer.java | 2 +- .../scaling/core/api/impl/ScalingAPIImpl.java | 8 +- .../scaling/core/config/HandleConfiguration.java | 28 +++++-- .../scaling/core/config/JobConfiguration.java | 21 +++--- .../core/config/internal/JobDataNodeEntry.java | 86 ++++++++++++++++++++++ .../core/config/internal/JobDataNodeLine.java | 74 +++++++++++++++++++ .../scaling/core/job/JobContext.java | 2 +- .../scaling/core/job/ScalingJob.java | 2 +- .../consistency/DataConsistencyCheckerImpl.java | 1 + .../core/job/preparer/DataSourcePreparer.java | 6 +- ...arer.java => PrepareTargetTablesParameter.java} | 23 +++--- .../core/job/preparer/ScalingJobPreparer.java | 5 +- .../scaling/core/util/ScalingTaskUtil.java | 2 +- .../ShardingRuleJobConfigurationPreparer.java | 65 +++++++++------- .../preparer/AbstractDataSourcePreparerTest.java | 4 +- .../scaling/core/api/impl/ScalingAPIImplTest.java | 2 +- .../core/config/internal/JobDataNodeEntryTest.java | 44 +++++++++++ .../core/config/internal/JobDataNodeLineTest.java | 42 +++++++++++ .../executor/importer/AbstractImporterTest.java | 6 +- .../core/fixture/FixtureDataSourcePreparer.java | 4 +- .../scaling/core/util/ResourceUtil.java | 14 +++- .../component/checker/MySQLDataSourcePreparer.java | 14 ++-- .../component/MySQLDataSourcePreparerTest.java | 26 +++---- .../checker/OpenGaussDataSourcePreparer.java | 33 +++++---- 33 files changed, 495 insertions(+), 184 deletions(-) diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java index d339bda..54e2fb5 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/ShardingSphereJDBCDataSourceConfiguration.java @@ -98,6 +98,18 @@ public final class ShardingSphereJDBCDataSourceConfiguration extends TypedDataSo } /** + * Get actual data source configuration. + * + * @param actualDataSourceName actual data source name + * @return actual data source configuration + */ + public StandardJDBCDataSourceConfiguration getActualDataSourceConfig(final String actualDataSourceName) { + Map<String, Object> yamlDataSourceConfig = rootConfig.getDataSources().get(actualDataSourceName); + Preconditions.checkNotNull(yamlDataSourceConfig, "actualDataSourceName '{}' does not exist", actualDataSourceName); + return new StandardJDBCDataSourceConfiguration(yamlDataSourceConfig); + } + + /** * YAML parameter configuration. */ @NoArgsConstructor diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java index 7dbf4f5..6eed671 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/StandardJDBCDataSourceConfiguration.java @@ -27,7 +27,7 @@ import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper; import org.apache.shardingsphere.infra.yaml.engine.YamlEngine; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -53,8 +53,20 @@ public final class StandardJDBCDataSourceConfiguration extends TypedDataSourceCo @SuppressWarnings("unchecked") public StandardJDBCDataSourceConfiguration(final String parameter) { + this(YamlEngine.unmarshal(parameter, Map.class), parameter); + } + + /** + * Construct by YAML data source configuration. + * + * @param yamlDataSourceConfig YAML data source configuration, equivalent to {@link DataSourceConfiguration} + */ + public StandardJDBCDataSourceConfiguration(final Map<String, Object> yamlDataSourceConfig) { + this(yamlDataSourceConfig, YamlEngine.marshal(yamlDataSourceConfig)); + } + + private StandardJDBCDataSourceConfiguration(final Map<String, Object> yamlConfig, final String parameter) { this.parameter = parameter; - Map<String, Object> yamlConfig = YamlEngine.unmarshal(parameter, Map.class); if (!yamlConfig.containsKey(DATA_SOURCE_CLASS_NAME)) { yamlConfig.put(DATA_SOURCE_CLASS_NAME, "com.zaxxer.hikari.HikariDataSource"); } @@ -83,11 +95,13 @@ public final class StandardJDBCDataSourceConfiguration extends TypedDataSourceCo hikariConfig.setJdbcUrl(new JdbcUri(hikariConfig.getJdbcUrl()).appendParameters(parameters)); } - private static String wrapParameter(final String jdbcUrl, final String username, final String password) { - Map<String, String> parameter = new HashMap<>(3, 1); - parameter.put("jdbcUrl", jdbcUrl); - parameter.put("username", username); - parameter.put("password", password); - return YamlEngine.marshal(parameter); + private static Map<String, Object> wrapParameter(final String jdbcUrl, final String username, final String password) { + Map<String, Object> result = new LinkedHashMap<>(3, 1); + result.put("jdbcUrl", jdbcUrl); + result.put("username", username); + result.put("password", password); + return result; } + + // TODO toShardingSphereJDBCDataSource(final String actualDataSourceName, final String logicTableName, final String actualTableName) } diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java index a1e8b2b..1473dd1 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/typed/TypedDataSourceConfiguration.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.spi.typed.TypedSPIRegistry; import javax.sql.DataSource; import java.sql.SQLException; import java.util.Map; +import java.util.Objects; import java.util.Optional; /** @@ -72,6 +73,23 @@ public abstract class TypedDataSourceConfiguration { */ public abstract DatabaseType getDatabaseType(); + @Override + public int hashCode() { + return Objects.hash(getType(), getParameter()); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (null == obj || getClass() != obj.getClass()) { + return false; + } + TypedDataSourceConfiguration that = (TypedDataSourceConfiguration) obj; + return Objects.equals(getType(), that.getType()) && Objects.equals(getParameter(), that.getParameter()); + } + /** * Wrap. * diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java index 2916ee0..e635625 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datanode/DataNode.java @@ -46,6 +46,7 @@ public final class DataNode { * @param dataNode string of data node. use {@code .} to split data source name and table name. */ public DataNode(final String dataNode) { + // TODO remove duplicated splitting? if (!isValidDataNode(dataNode)) { throw new ShardingSphereConfigurationException("Invalid format for actual data nodes: '%s'", dataNode); } @@ -75,4 +76,22 @@ public final class DataNode { public int hashCode() { return Objects.hashCode(dataSourceName.toUpperCase(), tableName.toUpperCase()); } + + /** + * Format DataNode as string. + * + * @return formatted string + */ + public String format() { + return dataSourceName + DELIMITER + tableName; + } + + /** + * Get formatted text length. + * + * @return formatted text length + */ + public int getFormattedTextLength() { + return dataSourceName.length() + DELIMITER.length() + tableName.length(); + } } diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java index b6af666..97e2facf 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/datanode/DataNodeTest.java @@ -79,4 +79,18 @@ public final class DataNodeTest { public void assertEmptyTableDataNode() { new DataNode("ds_0."); } + + @Test + public void assertFormat() { + String expected = "ds_0.tbl_0"; + DataNode dataNode = new DataNode(expected); + assertThat(dataNode.format(), is(expected)); + } + + @Test + public void assertFormattedTextLength() { + String text = "ds_0.tbl_0"; + DataNode dataNode = new DataNode(text); + assertThat(dataNode.getFormattedTextLength(), is(text.length())); + } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java index 07e9451..e01e2cc 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java @@ -22,7 +22,6 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceConfiguration; -import javax.sql.DataSource; import java.sql.SQLException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -71,7 +70,7 @@ public final class DataSourceManager implements AutoCloseable { * @param dataSourceConfig data source configuration * @return data source */ - public DataSource getDataSource(final TypedDataSourceConfiguration dataSourceConfig) { + public DataSourceWrapper getDataSource(final TypedDataSourceConfiguration dataSourceConfig) { if (cachedDataSources.containsKey(dataSourceConfig)) { return cachedDataSources.get(dataSourceConfig); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml index 26e77f7..326cc6e 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/pom.xml @@ -25,7 +25,7 @@ <version>5.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>shardingsphere-data-pipeline-api</artifactId> + <artifactId>shardingsphere-data-pipeline-spi</artifactId> <name>${project.artifactId}</name> </project> diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java index bbd976f..692b5cb 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/api/ScalingWorker.java @@ -109,6 +109,7 @@ public final class ScalingWorker { return rootConfig.getRules().stream().filter(each -> each instanceof YamlShardingRuleConfiguration).map(each -> (YamlShardingRuleConfiguration) each).findFirst(); } + // TODO more accurate comparison private boolean isShardingRulesTheSame(final YamlShardingRuleConfiguration sourceShardingConfig, final YamlShardingRuleConfiguration targetShardingConfig) { for (Entry<String, YamlTableRuleConfiguration> entry : sourceShardingConfig.getTables().entrySet()) { entry.getValue().setLogicTable(null); diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java index b5e1870..d42d487 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparer.java @@ -20,34 +20,16 @@ package org.apache.shardingsphere.migration.common.job.preparer; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceFactory; import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper; -import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration; -import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter; -import org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration; -import org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceConfiguration; -import org.apache.shardingsphere.infra.datanode.DataNode; -import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; +import org.apache.shardingsphere.scaling.core.config.RuleConfiguration; import org.apache.shardingsphere.scaling.core.job.preparer.ActualTableDefinition; import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer; import org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType; -import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; -import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration; -import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration; -import org.apache.shardingsphere.sharding.rule.ShardingRule; -import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -67,48 +49,12 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer { private final DataSourceFactory dataSourceFactory = new DataSourceFactory(); - protected DataSourceWrapper getSourceDataSource(final JobConfiguration jobConfig) { - return dataSourceFactory.newInstance(jobConfig.getRuleConfig().getSource().unwrap()); + protected DataSourceWrapper getSourceDataSource(final RuleConfiguration ruleConfig) { + return dataSourceFactory.newInstance(ruleConfig.getSource().unwrap()); } - protected DataSourceWrapper getTargetDataSource(final JobConfiguration jobConfig) { - return dataSourceFactory.newInstance(jobConfig.getRuleConfig().getTarget().unwrap()); - } - - protected Collection<String> getLogicTableNames(final TypedDataSourceConfiguration sourceConfig) { - ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig; - ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules()); - return getLogicTableNames(ruleConfig); - } - - private Collection<String> getLogicTableNames(final ShardingRuleConfiguration ruleConfig) { - Collection<String> result = new ArrayList<>(); - List<String> tableNames = ruleConfig.getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList()); - List<String> autoTableNames = ruleConfig.getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList()); - result.addAll(tableNames); - result.addAll(autoTableNames); - return result; - } - - /** - * Get data source table names map. - * - * @param sourceConfig source data source configuration - * @return data source table names map. map(data source, map(first actual table name of logic table, logic table name)). - */ - protected Map<DataSource, Map<String, String>> getDataSourceTableNamesMap(final TypedDataSourceConfiguration sourceConfig) { - ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig; - ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules()); - Map<String, DataSourceConfiguration> dataSourceConfigs = new YamlDataSourceConfigurationSwapper().getDataSourceConfigurations(source.getRootConfig()); - ShardingRule shardingRule = new ShardingRule(ruleConfig, source.getRootConfig().getDataSources().keySet()); - Collection<String> logicTableNames = getLogicTableNames(ruleConfig); - Map<String, Map<String, String>> dataSourceNameTableNamesMap = new HashMap<>(); - for (String each : logicTableNames) { - DataNode dataNode = shardingRule.getDataNode(each); - dataSourceNameTableNamesMap.computeIfAbsent(dataNode.getDataSourceName(), key -> new LinkedHashMap<>()).put(dataNode.getTableName(), each); - } - return dataSourceNameTableNamesMap.entrySet().stream().collect( - Collectors.toMap(entry -> DataSourceConverter.getDataSource(dataSourceConfigs.get(entry.getKey())), Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new)); + protected DataSourceWrapper getTargetDataSource(final RuleConfiguration ruleConfig) { + return dataSourceFactory.newInstance(ruleConfig.getTarget().unwrap()); } protected void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException { @@ -116,7 +62,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer { try (Statement statement = targetConnection.createStatement()) { statement.execute(sql); } catch (final SQLException ex) { - for (String ignoreMessage: IGNORE_EXCEPTION_MESSAGE) { + for (String ignoreMessage : IGNORE_EXCEPTION_MESSAGE) { if (ex.getMessage().contains(ignoreMessage)) { return; } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java index 73cdb22..d661d32 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/migration/common/spi/RuleJobConfigurationPreparer.java @@ -42,7 +42,7 @@ public interface RuleJobConfigurationPreparer extends TypedSPI { * Convert to handle configuration, used to build job configuration. * * @param ruleConfig rule configuration - * @return handle configuration + * @return handle configuration. It won't be used directly, but merge necessary configuration (e.g. shardingTables, logicTables) into final {@link HandleConfiguration} */ HandleConfiguration convertToHandleConfig(RuleConfiguration ruleConfig); diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java index 0ba3087..59613f1 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java @@ -86,7 +86,7 @@ public final class ScalingAPIImpl implements ScalingAPI { JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(result.getJobId()); JobConfiguration jobConfig = getJobConfig(jobConfigPOJO); result.setActive(!jobConfigPOJO.isDisabled()); - result.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount()); + result.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount()); result.setTables(jobConfig.getHandleConfig().getLogicTables()); result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time")); result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time")); @@ -135,7 +135,7 @@ public final class ScalingAPIImpl implements ScalingAPI { @Override public Optional<Long> start(final JobConfiguration jobConfig) { jobConfig.fillInProperties(); - if (jobConfig.getHandleConfig().getShardingTotalCount() == 0) { + if (jobConfig.getHandleConfig().getJobShardingCount() == 0) { log.warn("Invalid scaling job config!"); throw new ScalingJobCreationException("handleConfig shardingTotalCount is 0"); } @@ -150,7 +150,7 @@ public final class ScalingAPIImpl implements ScalingAPI { private String createJobConfig(final JobConfiguration jobConfig) { JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO(); jobConfigPOJO.setJobName(String.valueOf(jobConfig.getHandleConfig().getJobId())); - jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount()); + jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount()); jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig)); jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER)); return YamlEngine.marshal(jobConfigPOJO); @@ -174,7 +174,7 @@ public final class ScalingAPIImpl implements ScalingAPI { @Override public Map<Integer, JobProgress> getProgress(final long jobId) { - return IntStream.range(0, getJobConfig(jobId).getHandleConfig().getShardingTotalCount()).boxed() + return IntStream.range(0, getJobConfig(jobId).getHandleConfig().getJobShardingCount()).boxed() .collect(LinkedHashMap::new, (map, each) -> map.put(each, ScalingAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, each)), LinkedHashMap::putAll); } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java index 9c4e874..c0a7907 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java @@ -22,6 +22,8 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; +import java.util.List; + /** * Handle configuration. */ @@ -37,16 +39,26 @@ public final class HandleConfiguration { private int retryTimes = 3; - private String[] shardingTables; + /** + * Collection of each logic table's first data node. + * <p> + * If <pre>actualDataNodes: ds_${0..1}.t_order_${0..1}</pre> and <pre>actualDataNodes: ds_${0..1}.t_order_item_${0..1}</pre>, + * then value may be: {@code ds_0.t_order_0,ds_0.t_order_item_0}. + * </p> + */ + private String tablesFirstDataNodes; + + private List<String> jobShardingDataNodes; private String logicTables; - private int shardingItem; + /** + * Job sharding item, from {@link org.apache.shardingsphere.elasticjob.api.ShardingContext}. + */ + private Integer jobShardingItem; private int shardingSize = 1000 * 10000; - private boolean running = true; - private String databaseType; private WorkflowConfiguration workflowConfig; @@ -56,11 +68,11 @@ public final class HandleConfiguration { } /** - * Get sharding total count. + * Get job sharding count. * - * @return sharding total count + * @return job sharding count */ - public int getShardingTotalCount() { - return null == shardingTables ? 0 : shardingTables.length; + public int getJobShardingCount() { + return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size(); } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java index 1445b0b..38505c7 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java @@ -25,7 +25,6 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.migration.common.spi.RuleJobConfigurationPreparer; -import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm; import org.apache.shardingsphere.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.spi.typed.TypedSPIRegistry; @@ -33,6 +32,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; /** * Scaling job configuration. @@ -42,16 +42,9 @@ import java.util.Optional; @Getter @Setter @Slf4j +// TODO share for totally new scenario public final class JobConfiguration { - private static final SnowflakeKeyGenerateAlgorithm ID_AUTO_INCREASE_GENERATOR; - - static { - SnowflakeKeyGenerateAlgorithm generateAlgorithm = new SnowflakeKeyGenerateAlgorithm(); - generateAlgorithm.init(); - ID_AUTO_INCREASE_GENERATOR = generateAlgorithm; - } - static { ShardingSphereServiceLoader.register(RuleJobConfigurationPreparer.class); } @@ -66,13 +59,16 @@ public final class JobConfiguration { public void fillInProperties() { HandleConfiguration handleConfig = getHandleConfig(); if (null == handleConfig.getJobId()) { - handleConfig.setJobId((Long) ID_AUTO_INCREASE_GENERATOR.generateKey()); + handleConfig.setJobId(System.nanoTime() - ThreadLocalRandom.current().nextLong(100_0000)); } if (Strings.isNullOrEmpty(handleConfig.getDatabaseType())) { handleConfig.setDatabaseType(getRuleConfig().getSource().unwrap().getDatabaseType().getName()); } + if (null == handleConfig.getJobShardingItem()) { + handleConfig.setJobShardingItem(0); + } RuleConfiguration ruleConfig = getRuleConfig(); - if (null == handleConfig.getShardingTables()) { + if (null == handleConfig.getJobShardingDataNodes()) { List<HandleConfiguration> newHandleConfigs = new LinkedList<>(); for (String each : ruleConfig.getChangedYamlRuleConfigClassNames()) { Optional<RuleJobConfigurationPreparer> preparerOptional = TypedSPIRegistry.findRegisteredService(RuleJobConfigurationPreparer.class, each, null); @@ -82,8 +78,9 @@ public final class JobConfiguration { } // TODO handle several rules changed or dataSources changed for (HandleConfiguration each : newHandleConfigs) { - handleConfig.setShardingTables(each.getShardingTables()); + handleConfig.setJobShardingDataNodes(each.getJobShardingDataNodes()); handleConfig.setLogicTables(each.getLogicTables()); + handleConfig.setTablesFirstDataNodes(each.getTablesFirstDataNodes()); } } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntry.java new file mode 100644 index 0000000..2bb25b0 --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntry.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.core.config.internal; + +import com.google.common.base.Splitter; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.apache.shardingsphere.infra.datanode.DataNode; + +import java.util.LinkedList; +import java.util.List; + +/** + * Job data node entry. + */ +@Getter +@RequiredArgsConstructor +@ToString +public final class JobDataNodeEntry { + + @NonNull + private final String logicTableName; + + @NonNull + private final List<DataNode> dataNodes; + + /** + * Unmarshal from text. + * + * @param text marshalled entry + * @return entry + */ + public static JobDataNodeEntry unmarshal(final String text) { + List<String> segments = Splitter.on(":").splitToList(text); + String logicTableName = segments.get(0); + List<DataNode> dataNodes = new LinkedList<>(); + for (String each : Splitter.on(",").omitEmptyStrings().splitToList(segments.get(1))) { + dataNodes.add(new DataNode(each)); + } + return new JobDataNodeEntry(logicTableName, dataNodes); + } + + /** + * Marshal to text. + * + * @return text, format: logicTableName:dataNode1,dataNode2, e.g. t_order:ds_0.t_order_0,ds_0.t_order_1 + */ + public String marshal() { + StringBuilder result = new StringBuilder(getMarshalledTextEstimatedLength()); + result.append(logicTableName); + result.append(":"); + for (DataNode each : dataNodes) { + result.append(each.format()).append(','); + } + if (!dataNodes.isEmpty()) { + result.setLength(result.length() - 1); + } + return result.toString(); + } + + /** + * Get marshalled text estimated length. + * + * @return length + */ + public int getMarshalledTextEstimatedLength() { + return logicTableName.length() + 1 + dataNodes.stream().mapToInt(DataNode::getFormattedTextLength).sum() + dataNodes.size(); + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLine.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLine.java new file mode 100644 index 0000000..57ed60d --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLine.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.core.config.internal; + +import com.google.common.base.Splitter; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.List; + +/** + * Job data node line. + */ +@Getter +@RequiredArgsConstructor +@ToString +public final class JobDataNodeLine { + + @NonNull + private final List<JobDataNodeEntry> entries; + + /** + * Unmarshal from text. + * + * @param text marshalled line + * @return line + */ + public static JobDataNodeLine unmarshal(final String text) { + List<String> segments = Splitter.on('|').omitEmptyStrings().splitToList(text); + List<JobDataNodeEntry> entries = new ArrayList<>(segments.size()); + for (String each : segments) { + entries.add(JobDataNodeEntry.unmarshal(each)); + } + return new JobDataNodeLine(entries); + } + + /** + * Marshal to text. + * + * @return text, format: entry1|entry2, e.g. t_order:ds_0.t_order_0,ds_0.t_order_1|t_order_item:ds_0.t_order_item_0,ds_0.t_order_item_1 + */ + public String marshal() { + StringBuilder result = new StringBuilder(getMarshalledTextEstimatedLength()); + for (JobDataNodeEntry each : entries) { + result.append(each.marshal()).append('|'); + } + if (!entries.isEmpty()) { + result.setLength(result.length() - 1); + } + return result.toString(); + } + + private int getMarshalledTextEstimatedLength() { + return entries.stream().mapToInt(JobDataNodeEntry::getMarshalledTextEstimatedLength).sum() + entries.size(); + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java index 175921c..ade2f2b 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java @@ -58,7 +58,7 @@ public final class JobContext { this.jobConfig = jobConfig; jobConfig.fillInProperties(); jobId = jobConfig.getHandleConfig().getJobId(); - shardingItem = jobConfig.getHandleConfig().getShardingItem(); + shardingItem = jobConfig.getHandleConfig().getJobShardingItem(); taskConfigs = jobConfig.convertToTaskConfigs(); } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java index dd6fd09..e117a25 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java @@ -41,7 +41,7 @@ public final class ScalingJob implements SimpleJob { public void execute(final ShardingContext shardingContext) { log.info("Execute scaling job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem()); JobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), JobConfiguration.class, true); - jobConfig.getHandleConfig().setShardingItem(shardingContext.getShardingItem()); + jobConfig.getHandleConfig().setJobShardingItem(shardingContext.getShardingItem()); JobContext jobContext = new JobContext(jobConfig); jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem())); jobContext.setJobPreparer(jobPreparer); diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java index 11d789a..d0c787f 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java @@ -62,6 +62,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker private final DataSourceFactory dataSourceFactory = new DataSourceFactory(); + // TODO replace to JobConfiguration private final JobContext jobContext; @Override diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java index 94f0a81..41b9e62 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java @@ -17,8 +17,6 @@ package org.apache.shardingsphere.scaling.core.job.preparer; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; - /** * Data source preparer. */ @@ -27,7 +25,7 @@ public interface DataSourcePreparer { /** * Prepare target tables. * - * @param jobConfig job configuration + * @param parameter prepare target tables parameter */ - void prepareTargetTables(JobConfiguration jobConfig); + void prepareTargetTables(PrepareTargetTablesParameter parameter); } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/PrepareTargetTablesParameter.java similarity index 64% copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/PrepareTargetTablesParameter.java index 94f0a81..398bea1 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/DataSourcePreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/PrepareTargetTablesParameter.java @@ -17,17 +17,22 @@ package org.apache.shardingsphere.scaling.core.job.preparer; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.scaling.core.config.RuleConfiguration; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine; /** - * Data source preparer. + * Prepare target tables parameter. */ -public interface DataSourcePreparer { +@RequiredArgsConstructor +@Getter +public final class PrepareTargetTablesParameter { - /** - * Prepare target tables. - * - * @param jobConfig job configuration - */ - void prepareTargetTables(JobConfiguration jobConfig); + @NonNull + private final JobDataNodeLine tablesFirstDataNodes; + + @NonNull + private final RuleConfiguration ruleConfig; } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java index f632f2a..93684eb 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java @@ -25,6 +25,7 @@ import org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceCo import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException; import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.config.TaskConfiguration; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine; import org.apache.shardingsphere.scaling.core.job.JobContext; import org.apache.shardingsphere.scaling.core.job.JobStatus; import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory; @@ -74,7 +75,9 @@ public final class ScalingJobPreparer { log.info("dataSourcePreparer null, ignore prepare target"); return; } - dataSourcePreparer.prepareTargetTables(jobConfig); + JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes()); + PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getRuleConfig()); + dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter); } private void initDataSourceManager(final DataSourceManager dataSourceManager, final List<TaskConfiguration> taskConfigs) { diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java index 809b2cb..3f005d3 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java @@ -55,7 +55,7 @@ public final class ScalingTaskUtil { } private static boolean isProgressCompleted(final Map<Integer, JobProgress> jobProgressMap, final HandleConfiguration handleConfig) { - return handleConfig.getShardingTotalCount() == jobProgressMap.size() + return handleConfig.getJobShardingCount() == jobProgressMap.size() && jobProgressMap.values().stream().allMatch(Objects::nonNull); } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java index 6ec0e9b..90596a4 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleJobConfigurationPreparer.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.sharding.schedule; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -35,6 +36,8 @@ import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration; import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.config.RuleConfiguration; import org.apache.shardingsphere.scaling.core.config.TaskConfiguration; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeEntry; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine; import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration; @@ -75,11 +78,18 @@ public final class ShardingRuleJobConfigurationPreparer implements RuleJobConfig for (Entry<String, List<DataNode>> entry : shouldScalingActualDataNodes.entrySet()) { dataNodes.addAll(entry.getValue()); } - result.setShardingTables(groupByDataSource(dataNodes)); + result.setJobShardingDataNodes(groupByDataSource(dataNodes)); result.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet())); + result.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes)); return result; } + /** + * Get scaling actual data nodes. + * + * @param ruleConfig rule configuration + * @return map(logic table name, DataNode of each logic table) + */ private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final RuleConfiguration ruleConfig) { TypedDataSourceConfiguration sourceConfig = ruleConfig.getSource().unwrap(); Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration, @@ -95,21 +105,28 @@ public final class ShardingRuleJobConfigurationPreparer implements RuleJobConfig return result; } - private static String[] groupByDataSource(final Collection<DataNode> dataNodes) { + private static List<String> groupByDataSource(final Collection<DataNode> dataNodes) { Map<String, Collection<DataNode>> dataSourceDataNodesMap = new LinkedHashMap<>(); for (DataNode each : dataNodes) { dataSourceDataNodesMap.computeIfAbsent(each.getDataSourceName(), k -> new LinkedList<>()).add(each); } - return dataSourceDataNodesMap.values().stream().map(each -> each.stream().map(dataNode -> String.format("%s.%s", dataNode.getDataSourceName(), dataNode.getTableName())) - .collect(Collectors.joining(","))).toArray(String[]::new); + return dataSourceDataNodesMap.values().stream().map(each -> each.stream().map(DataNode::format) + .collect(Collectors.joining(","))).collect(Collectors.toList()); } private static String getLogicTables(final Set<String> logicTables) { - return logicTables.stream() - .reduce((a, b) -> String.format("%s, %s", a, b)) - .orElse(""); + return Joiner.on(',').join(logicTables); } + private static String getTablesFirstDataNodes(final Map<String, List<DataNode>> actualDataNodes) { + List<JobDataNodeEntry> dataNodeEntries = new ArrayList<>(actualDataNodes.size()); + for (Entry<String, List<DataNode>> entry : actualDataNodes.entrySet()) { + dataNodeEntries.add(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))); + } + return new JobDataNodeLine(dataNodeEntries).marshal(); + } + + // TODO handle several rules changed or dataSources changed @Override public List<TaskConfiguration> convertToTaskConfigs(final JobConfiguration jobConfig) { List<TaskConfiguration> result = new LinkedList<>(); @@ -145,38 +162,36 @@ public final class ShardingRuleJobConfigurationPreparer implements RuleJobConfig return Optional.empty(); } - private static void filterByShardingDataSourceTables(final Map<String, Map<String, String>> dataSourceTableNameMap, final HandleConfiguration handleConfig) { - if (null == handleConfig.getShardingTables()) { - log.info("shardingTables null"); + private static void filterByShardingDataSourceTables(final Map<String, Map<String, String>> totalDataSourceTableNameMap, final HandleConfiguration handleConfig) { + if (null == handleConfig.getJobShardingDataNodes()) { + log.info("jobShardingDataNodes null"); return; } - Map<String, Set<String>> shardingDataSourceTableMap = toDataSourceTableNameMap(getShardingDataSourceTables(handleConfig)); - dataSourceTableNameMap.entrySet().removeIf(entry -> !shardingDataSourceTableMap.containsKey(entry.getKey())); - for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) { - filterByShardingTables(entry.getValue(), shardingDataSourceTableMap.get(entry.getKey())); + // TODO simplify data source and table name converting, and jobShardingDataNodes format + Map<String, Set<String>> jobDataSourceTableNameMap = toDataSourceTableNameMap(getJobShardingDataNodesEntry(handleConfig)); + totalDataSourceTableNameMap.entrySet().removeIf(entry -> !jobDataSourceTableNameMap.containsKey(entry.getKey())); + for (Entry<String, Map<String, String>> entry : totalDataSourceTableNameMap.entrySet()) { + filterByShardingTables(entry.getValue(), jobDataSourceTableNameMap.get(entry.getKey())); } } - private static String getShardingDataSourceTables(final HandleConfiguration handleConfig) { - if (handleConfig.getShardingItem() >= handleConfig.getShardingTables().length) { - log.warn("shardingItem={} ge handleConfig.shardingTables.len={}", handleConfig.getShardingItem(), handleConfig.getShardingTables().length); + private static String getJobShardingDataNodesEntry(final HandleConfiguration handleConfig) { + if (handleConfig.getJobShardingItem() >= handleConfig.getJobShardingDataNodes().size()) { + log.warn("jobShardingItem={} ge handleConfig.jobShardingDataNodes.len={}", handleConfig.getJobShardingItem(), handleConfig.getJobShardingDataNodes().size()); return ""; } - return handleConfig.getShardingTables()[handleConfig.getShardingItem()]; + return handleConfig.getJobShardingDataNodes().get(handleConfig.getJobShardingItem()); } private static void filterByShardingTables(final Map<String, String> fullTables, final Set<String> shardingTables) { fullTables.entrySet().removeIf(entry -> !shardingTables.contains(entry.getKey())); } - private static Map<String, Set<String>> toDataSourceTableNameMap(final String shardingDataSourceTables) { + private static Map<String, Set<String>> toDataSourceTableNameMap(final String jobShardingDataNodesEntry) { Map<String, Set<String>> result = new HashMap<>(); - for (String each : new InlineExpressionParser(shardingDataSourceTables).splitAndEvaluate()) { - String[] table = each.split("\\."); - if (!result.containsKey(table[0])) { - result.put(table[0], new HashSet<>()); - } - result.get(table[0]).add(table[1]); + for (String each : new InlineExpressionParser(jobShardingDataNodesEntry).splitAndEvaluate()) { + DataNode dataNode = new DataNode(each); + result.computeIfAbsent(dataNode.getDataSourceName(), k -> new HashSet<>()).add(dataNode.getTableName()); } return result; } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java index 5c70ce2..528abd5 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/migration/common/job/preparer/AbstractDataSourcePreparerTest.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.migration.common.job.preparer; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; +import org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter; import org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType; import org.junit.Test; @@ -38,7 +38,7 @@ public final class AbstractDataSourcePreparerTest { private final AbstractDataSourcePreparer preparer = new AbstractDataSourcePreparer() { @Override - public void prepareTargetTables(final JobConfiguration jobConfig) { + public void prepareTargetTables(final PrepareTargetTablesParameter parameter) { } }; diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java index 0505851..c7cfc85 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java @@ -75,7 +75,7 @@ public final class ScalingAPIImplTest { assertThat(jobInfo.getTables(), is("t_order")); assertThat(jobInfo.getShardingTotalCount(), is(1)); List<Long> uncompletedJobIds = scalingAPI.getUncompletedJobIds("logic_db"); - assertThat(uncompletedJobIds.size(), is(0)); + assertTrue(uncompletedJobIds.size() > 0); } private Optional<JobInfo> getJobInfo(final long jobId) { diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntryTest.java new file mode 100644 index 0000000..443c772 --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeEntryTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.core.config.internal; + +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +public final class JobDataNodeEntryTest { + + @Test + public void assertSerialization() { + String text = "t_order:ds_0.t_order_0,ds_0.t_order_1"; + JobDataNodeEntry actual = JobDataNodeEntry.unmarshal(text); + assertNotNull(actual); + assertThat(actual.marshal(), is(text)); + assertThat(actual.getLogicTableName(), is("t_order")); + List<DataNode> dataNodes = actual.getDataNodes(); + assertNotNull(dataNodes); + assertThat(dataNodes.size(), is(2)); + assertThat(dataNodes.get(0).format(), is("ds_0.t_order_0")); + assertThat(dataNodes.get(1).format(), is("ds_0.t_order_1")); + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLineTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLineTest.java new file mode 100644 index 0000000..09acb46 --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/internal/JobDataNodeLineTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.core.config.internal; + +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +public final class JobDataNodeLineTest { + + @Test + public void assertSerialization() { + String text = "t_order:ds_0.t_order_0,ds_0.t_order_1|t_order_item:ds_0.t_order_item_0,ds_0.t_order_item_1"; + JobDataNodeLine actual = JobDataNodeLine.unmarshal(text); + assertNotNull(actual); + assertThat(actual.marshal(), is(text)); + List<JobDataNodeEntry> entries = actual.getEntries(); + assertNotNull(entries); + assertThat(entries.size(), is(2)); + assertThat(entries.get(0).getLogicTableName(), is("t_order")); + assertThat(entries.get(1).getLogicTableName(), is("t_order_item")); + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java index 089d49d..c22331b 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.scaling.core.executor.importer; import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceManager; +import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.Channel; import org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column; @@ -35,7 +36,6 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -75,8 +75,8 @@ public final class AbstractImporterTest { @Mock private Channel channel; - @Mock(extraInterfaces = AutoCloseable.class) - private DataSource dataSource; + @Mock + private DataSourceWrapper dataSource; @Mock private Connection connection; diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java index 0230e98..7c3153a 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataSourcePreparer.java @@ -17,12 +17,12 @@ package org.apache.shardingsphere.scaling.core.fixture; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer; +import org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter; public final class FixtureDataSourcePreparer implements DataSourcePreparer { @Override - public void prepareTargetTables(final JobConfiguration jobConfig) { + public void prepareTargetTables(final PrepareTargetTablesParameter parameter) { } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java index feb585d..2ec25d0 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ResourceUtil.java @@ -22,8 +22,10 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration; import org.apache.shardingsphere.infra.config.datasource.typed.StandardJDBCDataSourceConfiguration; +import org.apache.shardingsphere.scaling.core.config.HandleConfiguration; import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.config.RuleConfiguration; +import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration; import java.io.IOException; import java.io.InputStream; @@ -73,12 +75,20 @@ public final class ResourceUtil { * @return standard JDBC as target job configuration */ public static JobConfiguration mockStandardJdbcTargetJobConfig() { + JobConfiguration result = new JobConfiguration(); RuleConfiguration ruleConfig = new RuleConfiguration(); + result.setRuleConfig(ruleConfig); setupChangedYamlRuleConfigClassNames(ruleConfig); ruleConfig.setSource(new ShardingSphereJDBCDataSourceConfiguration(readFileToString("/config_sharding_sphere_jdbc_source.yaml")).wrap()); ruleConfig.setTarget(new StandardJDBCDataSourceConfiguration(readFileToString("/config_standard_jdbc_target.yaml")).wrap()); - JobConfiguration result = new JobConfiguration(); - result.setRuleConfig(ruleConfig); + HandleConfiguration handleConfig = new HandleConfiguration(); + result.setHandleConfig(handleConfig); + handleConfig.setJobShardingItem(0); + handleConfig.setLogicTables("t_order"); + handleConfig.setTablesFirstDataNodes("t_order:ds_0.t_order"); + handleConfig.setJobShardingDataNodes(Collections.singletonList("ds_0.t_order")); + handleConfig.setWorkflowConfig(new WorkflowConfiguration("logic_db", "id1")); + result.fillInProperties(); return result; } diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java index 209c258..0746253 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java @@ -21,7 +21,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper; import org.apache.shardingsphere.migration.common.job.preparer.AbstractDataSourcePreparer; import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; +import org.apache.shardingsphere.scaling.core.config.RuleConfiguration; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeEntry; +import org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter; import org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder; import java.sql.Connection; @@ -30,6 +32,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; import java.util.Collections; +import java.util.stream.Collectors; /** * Data source preparer for MySQL. @@ -40,12 +43,13 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer { private final MySQLScalingSQLBuilder scalingSQLBuilder = new MySQLScalingSQLBuilder(Collections.emptyMap()); @Override - public void prepareTargetTables(final JobConfiguration jobConfig) { - try (DataSourceWrapper sourceDataSource = getSourceDataSource(jobConfig); + public void prepareTargetTables(final PrepareTargetTablesParameter parameter) { + RuleConfiguration ruleConfig = parameter.getRuleConfig(); + try (DataSourceWrapper sourceDataSource = getSourceDataSource(ruleConfig); Connection sourceConnection = sourceDataSource.getConnection(); - DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig); + DataSourceWrapper targetDataSource = getTargetDataSource(ruleConfig); Connection targetConnection = targetDataSource.getConnection()) { - Collection<String> logicTableNames = getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap()); + Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList()); for (String each : logicTableNames) { String createTableSQL = getCreateTableSQL(sourceConnection, each); createTableSQL = addIfNotExistsForCreateTableSQL(createTableSQL); diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java index cfd9b5b..40080e8 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataSourcePreparerTest.java @@ -19,12 +19,11 @@ package org.apache.shardingsphere.scaling.mysql.component; import org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration; import org.apache.shardingsphere.infra.config.datasource.typed.TypedDataSourceConfigurationWrap; -import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.config.RuleConfiguration; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeLine; +import org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter; import org.apache.shardingsphere.scaling.mysql.component.checker.MySQLDataSourcePreparer; -import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,7 +41,7 @@ import static org.mockito.Mockito.when; public final class MySQLDataSourcePreparerTest { @Mock - private JobConfiguration jobConfiguration; + private PrepareTargetTablesParameter prepareTargetTablesParameter; @Mock private RuleConfiguration ruleConfiguration; @@ -65,37 +64,30 @@ public final class MySQLDataSourcePreparerTest { @Mock(extraInterfaces = AutoCloseable.class) private DataSource targetDataSource; - @Mock - private YamlRootConfiguration yamlRootConfiguration; - - @Mock - private YamlShardingRuleConfiguration yamlShardingRuleConfiguration; - @Before public void setUp() throws SQLException { - when(jobConfiguration.getRuleConfig()).thenReturn(ruleConfiguration); + when(prepareTargetTablesParameter.getRuleConfig()).thenReturn(ruleConfiguration); + when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList())); when(ruleConfiguration.getSource()).thenReturn(sourceDataSourceConfigurationWrap); when(sourceDataSourceConfigurationWrap.unwrap()).thenReturn(sourceScalingDataSourceConfiguration); when(sourceScalingDataSourceConfiguration.toDataSource()).thenReturn(sourceDataSource); - when(sourceScalingDataSourceConfiguration.getRootConfig()).thenReturn(yamlRootConfiguration); - when(yamlRootConfiguration.getRules()).thenReturn(Collections.singletonList(yamlShardingRuleConfiguration)); when(ruleConfiguration.getTarget()).thenReturn(targetDataSourceConfigurationWrap); when(targetDataSourceConfigurationWrap.unwrap()).thenReturn(targetScalingDataSourceConfiguration); when(targetScalingDataSourceConfiguration.toDataSource()).thenReturn(targetDataSource); } - + @Test public void assertGetConnection() throws SQLException { MySQLDataSourcePreparer mySQLDataSourcePreparer = new MySQLDataSourcePreparer(); - mySQLDataSourcePreparer.prepareTargetTables(jobConfiguration); + mySQLDataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter); verify(sourceDataSource).getConnection(); verify(targetDataSource).getConnection(); } - + @Test(expected = PrepareFailedException.class) public void assertThrowPrepareFailedException() throws SQLException { when(sourceDataSource.getConnection()).thenThrow(SQLException.class); MySQLDataSourcePreparer mySQLDataSourcePreparer = new MySQLDataSourcePreparer(); - mySQLDataSourcePreparer.prepareTargetTables(jobConfiguration); + mySQLDataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter); } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java index 630ee3e..12d277c 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java +++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java @@ -18,14 +18,17 @@ package org.apache.shardingsphere.scaling.opengauss.component.checker; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceManager; import org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper; +import org.apache.shardingsphere.infra.config.datasource.typed.ShardingSphereJDBCDataSourceConfiguration; +import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.migration.common.job.preparer.AbstractDataSourcePreparer; import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException; -import org.apache.shardingsphere.scaling.core.config.JobConfiguration; +import org.apache.shardingsphere.scaling.core.config.internal.JobDataNodeEntry; import org.apache.shardingsphere.scaling.core.job.preparer.ActualTableDefinition; +import org.apache.shardingsphere.scaling.core.job.preparer.PrepareTargetTablesParameter; import org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -48,15 +51,15 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare private static final String WITH_OF_TABLE_EXTEND = "with ("; @Override - public void prepareTargetTables(final JobConfiguration jobConfig) { + public void prepareTargetTables(final PrepareTargetTablesParameter parameter) { Collection<ActualTableDefinition> actualTableDefinitions; try { - actualTableDefinitions = getActualTableDefinitions(jobConfig); + actualTableDefinitions = getActualTableDefinitions(parameter); } catch (final SQLException ex) { throw new PrepareFailedException("get table definitions failed.", ex); } Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions); - try (DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig); + try (DataSourceWrapper targetDataSource = getTargetDataSource(parameter.getRuleConfig()); Connection targetConnection = targetDataSource.getConnection()) { for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) { for (String each : entry.getValue()) { @@ -69,17 +72,19 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare } } - private Collection<ActualTableDefinition> getActualTableDefinitions(final JobConfiguration jobConfig) throws SQLException { + private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException { Collection<ActualTableDefinition> result = new ArrayList<>(); - Map<DataSource, Map<String, String>> dataSourceTableNamesMap = getDataSourceTableNamesMap(jobConfig.getRuleConfig().getSource().unwrap()); - for (Entry<DataSource, Map<String, String>> entry : dataSourceTableNamesMap.entrySet()) { - try (DataSourceWrapper dataSource = new DataSourceWrapper(entry.getKey()); - Connection sourceConnection = dataSource.getConnection()) { - for (Entry<String, String> tableNameEntry : entry.getValue().entrySet()) { - String actualTableName = tableNameEntry.getKey(); + ShardingSphereJDBCDataSourceConfiguration sourceConfig = (ShardingSphereJDBCDataSourceConfiguration) parameter.getRuleConfig().getSource().unwrap(); + try (DataSourceManager dataSourceManager = new DataSourceManager()) { + for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) { + DataNode dataNode = each.getDataNodes().get(0); + // Keep dataSource to reuse + DataSourceWrapper dataSource = dataSourceManager.getDataSource(sourceConfig.getActualDataSourceConfig(dataNode.getDataSourceName())); + try (Connection sourceConnection = dataSource.getConnection()) { + String actualTableName = dataNode.getTableName(); int oid = queryTableOid(sourceConnection, actualTableName); String tableDefinition = queryTableDefinition(sourceConnection, oid); - String logicTableName = tableNameEntry.getValue(); + String logicTableName = each.getLogicTableName(); result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition)); } } @@ -158,7 +163,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare String[] search = {WITH_OF_TABLE_EXTEND, ")"}; List<Integer> searchPos = new ArrayList<>(2); int startPos = 0; - for (String each: search) { + for (String each : search) { int curSearch = lowerCreateSQL.indexOf(each, startPos); if (curSearch <= 0) { break;