This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8ff51c0 Rename RuleConfiguration in pipeline module to
PipelineConfiguration (#14322)
8ff51c0 is described below
commit 8ff51c0c0518353b5bc440e9d32cc71601c3a4fe
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 25 18:42:35 2021 +0800
Rename RuleConfiguration in pipeline module to PipelineConfiguration
(#14322)
* Rename RuleConfiguration in pipeline module to PipelineConfiguration
* Revert IDEA error auto rename
---
...hardingRuleAlteredJobConfigurationPreparer.java | 30 +++++++++++-----------
.../api/config/rulealtered/JobConfiguration.java | 18 +++++++------
...nfiguration.java => PipelineConfiguration.java} | 4 +--
.../datasource/PrepareTargetTablesParameter.java | 4 +--
.../pipeline/core/api/impl/PipelineJobAPIImpl.java | 2 +-
.../consistency/DataConsistencyCheckerImpl.java | 8 +++---
.../datasource/AbstractDataSourcePreparer.java | 10 ++++----
.../rulealtered/RuleAlteredJobPreparer.java | 2 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 14 +++++-----
.../RuleAlteredJobConfigurationPreparer.java | 10 ++++----
.../job/environment/ScalingEnvironmentManager.java | 5 ++--
.../datasource/MySQLDataSourcePreparer.java | 8 +++---
.../datasource/MySQLDataSourcePreparerTest.java | 10 ++++----
.../datasource/OpenGaussDataSourcePreparer.java | 4 +--
.../test/mysql/env/ITEnvironmentContext.java | 10 ++++----
.../pipeline/api/impl/PipelineJobAPIImplTest.java | 14 +++++-----
.../core/datasource/DataSourceManagerTest.java | 6 ++---
.../data/pipeline/core/util/ResourceUtil.java | 10 ++++----
18 files changed, 86 insertions(+), 83 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index d244975..642ce3f 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
@@ -71,9 +71,9 @@ import java.util.stream.Collectors;
public final class ShardingRuleAlteredJobConfigurationPreparer implements
RuleAlteredJobConfigurationPreparer {
@Override
- public HandleConfiguration createHandleConfiguration(final
RuleConfiguration ruleConfig) {
+ public HandleConfiguration createHandleConfiguration(final
PipelineConfiguration pipelineConfig) {
HandleConfiguration result = new HandleConfiguration();
- Map<String, List<DataNode>> shouldScalingActualDataNodes =
getShouldScalingActualDataNodes(ruleConfig);
+ Map<String, List<DataNode>> shouldScalingActualDataNodes =
getShouldScalingActualDataNodes(pipelineConfig);
Collection<DataNode> dataNodes = new ArrayList<>();
for (Entry<String, List<DataNode>> entry :
shouldScalingActualDataNodes.entrySet()) {
dataNodes.addAll(entry.getValue());
@@ -84,8 +84,8 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return result;
}
- private static Map<String, List<DataNode>>
getShouldScalingActualDataNodes(final RuleConfiguration ruleConfig) {
- JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getSource().getType(),
ruleConfig.getSource().getParameter());
+ private static Map<String, List<DataNode>>
getShouldScalingActualDataNodes(final PipelineConfiguration pipelineConfig) {
+ JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(),
pipelineConfig.getSource().getParameter());
Preconditions.checkState(sourceConfig instanceof
ShardingSphereJDBCDataSourceConfiguration,
"Only ShardingSphereJdbc type of source
TypedDataSourceConfiguration is supported.");
ShardingSphereJDBCDataSourceConfiguration source =
(ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
@@ -121,19 +121,19 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
}
@Override
- public Collection<TaskConfiguration> createTaskConfigurations(final
RuleConfiguration ruleConfig, final HandleConfiguration handleConfig) {
+ public Collection<TaskConfiguration> createTaskConfigurations(final
PipelineConfiguration pipelineConfig, final HandleConfiguration handleConfig) {
Collection<TaskConfiguration> result = new LinkedList<>();
- ShardingSphereJDBCDataSourceConfiguration sourceConfig =
getSourceConfiguration(ruleConfig);
+ ShardingSphereJDBCDataSourceConfiguration sourceConfig =
getSourceConfiguration(pipelineConfig);
ShardingRuleConfiguration sourceRuleConfig =
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
Map<String, DataSourceConfiguration> sourceDataSource = new
YamlDataSourceConfigurationSwapper().getDataSourceConfigurations(sourceConfig.getRootConfig());
Map<String, Map<String, String>> dataSourceTableNameMap =
toDataSourceTableNameMap(new ShardingRule(sourceRuleConfig,
sourceConfig.getRootConfig().getDataSources().keySet()));
- Optional<ShardingRuleConfiguration> targetRuleConfig =
getTargetRuleConfiguration(ruleConfig);
+ Optional<ShardingRuleConfiguration> targetRuleConfig =
getTargetRuleConfiguration(pipelineConfig);
filterByShardingDataSourceTables(dataSourceTableNameMap, handleConfig);
Map<String, Set<String>> shardingColumnsMap =
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
for (Entry<String, Map<String, String>> entry :
dataSourceTableNameMap.entrySet()) {
OnRuleAlteredActionConfiguration ruleAlteredActionConfig =
getRuleAlteredActionConfig(targetRuleConfig.orElse(sourceRuleConfig)).orElse(null);
DumperConfiguration dumperConfig =
createDumperConfig(entry.getKey(),
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue(),
ruleAlteredActionConfig);
- ImporterConfiguration importerConfig =
createImporterConfig(ruleConfig, handleConfig, shardingColumnsMap);
+ ImporterConfiguration importerConfig =
createImporterConfig(pipelineConfig, handleConfig, shardingColumnsMap);
TaskConfiguration taskConfig = new TaskConfiguration(handleConfig,
dumperConfig, importerConfig);
log.info("toTaskConfigs, dataSourceName={}, taskConfig={}",
entry.getKey(), taskConfig);
result.add(taskConfig);
@@ -145,14 +145,14 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return
Optional.ofNullable(shardingRuleConfig.getScaling().get(shardingRuleConfig.getScalingName()));
}
- private static ShardingSphereJDBCDataSourceConfiguration
getSourceConfiguration(final RuleConfiguration ruleConfig) {
- JDBCDataSourceConfiguration result =
JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getSource().getType(),
ruleConfig.getSource().getParameter());
+ private static ShardingSphereJDBCDataSourceConfiguration
getSourceConfiguration(final PipelineConfiguration pipelineConfig) {
+ JDBCDataSourceConfiguration result =
JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(),
pipelineConfig.getSource().getParameter());
Preconditions.checkArgument(result instanceof
ShardingSphereJDBCDataSourceConfiguration, "Only support ShardingSphere source
data source.");
return (ShardingSphereJDBCDataSourceConfiguration) result;
}
- private static Optional<ShardingRuleConfiguration>
getTargetRuleConfiguration(final RuleConfiguration ruleConfig) {
- JDBCDataSourceConfiguration dataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getTarget().getType(),
ruleConfig.getTarget().getParameter());
+ private static Optional<ShardingRuleConfiguration>
getTargetRuleConfiguration(final PipelineConfiguration pipelineConfig) {
+ JDBCDataSourceConfiguration dataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(),
pipelineConfig.getTarget().getParameter());
if (dataSourceConfig instanceof
ShardingSphereJDBCDataSourceConfiguration) {
return Optional.of(
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(((ShardingSphereJDBCDataSourceConfiguration)
dataSourceConfig).getRootConfig().getRules()));
@@ -274,9 +274,9 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return result;
}
- private static ImporterConfiguration createImporterConfig(final
RuleConfiguration ruleConfig, final HandleConfiguration handleConfig, final
Map<String, Set<String>> shardingColumnsMap) {
+ private static ImporterConfiguration createImporterConfig(final
PipelineConfiguration pipelineConfig, final HandleConfiguration handleConfig,
final Map<String, Set<String>> shardingColumnsMap) {
ImporterConfiguration result = new ImporterConfiguration();
-
result.setDataSourceConfig(JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getTarget().getType(),
ruleConfig.getTarget().getParameter()));
+
result.setDataSourceConfig(JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(),
pipelineConfig.getTarget().getParameter()));
result.setShardingColumnsMap(shardingColumnsMap);
result.setRetryTimes(handleConfig.getRetryTimes());
return result;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
index 90d7c8b..dbac588 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
@@ -49,36 +49,38 @@ public final class JobConfiguration {
private WorkflowConfiguration workflowConfig;
- private RuleConfiguration ruleConfig;
+ private PipelineConfiguration pipelineConfig;
private HandleConfiguration handleConfig;
- public JobConfiguration(final WorkflowConfiguration workflowConfig, final
RuleConfiguration ruleConfig) {
+ public JobConfiguration(final WorkflowConfiguration workflowConfig, final
PipelineConfiguration pipelineConfig) {
this.workflowConfig = workflowConfig;
- this.ruleConfig = ruleConfig;
+ this.pipelineConfig = pipelineConfig;
}
/**
* Build handle configuration.
*/
public void buildHandleConfig() {
- RuleConfiguration ruleConfig = getRuleConfig();
+ PipelineConfiguration pipelineConfig = getPipelineConfig();
HandleConfiguration handleConfig = getHandleConfig();
if (null == handleConfig || null ==
handleConfig.getJobShardingDataNodes()) {
// TODO singleton
RuleAlteredJobConfigurationPreparer preparer =
RequiredSPIRegistry.getRegisteredService(RuleAlteredJobConfigurationPreparer.class);
- handleConfig = preparer.createHandleConfiguration(ruleConfig);
+ handleConfig = preparer.createHandleConfiguration(pipelineConfig);
this.handleConfig = handleConfig;
}
if (null == handleConfig.getJobId()) {
handleConfig.setJobId(System.nanoTime() -
ThreadLocalRandom.current().nextLong(100_0000));
}
if (Strings.isNullOrEmpty(handleConfig.getSourceDatabaseType())) {
- JDBCDataSourceConfiguration sourceDataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(getRuleConfig().getSource().getType(),
getRuleConfig().getSource().getParameter());
+ JDBCDataSourceConfiguration sourceDataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
+ getPipelineConfig().getSource().getType(),
getPipelineConfig().getSource().getParameter());
handleConfig.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getName());
}
if (Strings.isNullOrEmpty(handleConfig.getTargetDatabaseType())) {
- JDBCDataSourceConfiguration targetDataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(getRuleConfig().getTarget().getType(),
getRuleConfig().getTarget().getParameter());
+ JDBCDataSourceConfiguration targetDataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
+ getPipelineConfig().getTarget().getType(),
getPipelineConfig().getTarget().getParameter());
handleConfig.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getName());
}
if (null == handleConfig.getJobShardingItem()) {
@@ -93,6 +95,6 @@ public final class JobConfiguration {
*/
public Collection<TaskConfiguration> buildTaskConfigs() {
RuleAlteredJobConfigurationPreparer preparer =
RequiredSPIRegistry.getRegisteredService(RuleAlteredJobConfigurationPreparer.class);
- return preparer.createTaskConfigurations(ruleConfig, handleConfig);
+ return preparer.createTaskConfigurations(pipelineConfig, handleConfig);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
similarity index 96%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleConfiguration.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
index 86dc536..f0965d8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
@@ -22,10 +22,10 @@ import lombok.Getter;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.yaml.YamlJDBCDataSourceConfiguration;
/**
- * Rule configuration.
+ * Pipeline configuration.
*/
@Getter
-public final class RuleConfiguration {
+public final class PipelineConfiguration {
private YamlJDBCDataSourceConfiguration source;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
index 3ff9b9c..3b1b202 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.api.prepare.datasource;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
/**
@@ -34,5 +34,5 @@ public final class PrepareTargetTablesParameter {
private final JobDataNodeLine tablesFirstDataNodes;
@NonNull
- private final RuleConfiguration ruleConfig;
+ private final PipelineConfiguration pipelineConfiguration;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
index 35c4b8f..269db26 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
@@ -288,7 +288,7 @@ public final class PipelineJobAPIImpl implements
PipelineJobAPI {
}
Optional<Collection<RuleAlteredJobContext>> optionalJobContexts =
RuleAlteredJobSchedulerCenter.getJobContexts(jobId);
optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each
-> each.setStatus(JobStatus.ALMOST_FINISHED)));
- YamlRootConfiguration yamlRootConfig =
YamlEngine.unmarshal(jobConfig.getRuleConfig().getTarget().getParameter(),
YamlRootConfiguration.class);
+ YamlRootConfiguration yamlRootConfig =
YamlEngine.unmarshal(jobConfig.getPipelineConfig().getTarget().getParameter(),
YamlRootConfiguration.class);
WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
String schemaName = workflowConfig.getSchemaName();
String ruleCacheId = workflowConfig.getRuleCacheId();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 8de0eab..eed408a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -76,9 +76,9 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job"
+ jobContext.getJobId() % 10_000 + "-countCheck-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
- jobContext.getJobConfig().getRuleConfig().getSource().getType(),
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
+
jobContext.getJobConfig().getPipelineConfig().getSource().getType(),
jobContext.getJobConfig().getPipelineConfig().getSource().getParameter());
JDBCDataSourceConfiguration targetConfig =
JDBCDataSourceConfigurationFactory.newInstance(
- jobContext.getJobConfig().getRuleConfig().getTarget().getType(),
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
+
jobContext.getJobConfig().getPipelineConfig().getTarget().getType(),
jobContext.getJobConfig().getPipelineConfig().getTarget().getParameter());
try (DataSourceWrapper sourceDataSource =
dataSourceFactory.newInstance(sourceConfig);
DataSourceWrapper targetDataSource =
dataSourceFactory.newInstance(targetConfig)) {
return jobContext.getTaskConfigs()
@@ -120,10 +120,10 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
public Map<String, Boolean> checkRecordsContent(final
DataConsistencyCheckAlgorithm checkAlgorithm) {
Collection<String> supportedDatabaseTypes =
checkAlgorithm.getSupportedDatabaseTypes();
JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
-
jobContext.getJobConfig().getRuleConfig().getSource().getType(),
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
+
jobContext.getJobConfig().getPipelineConfig().getSource().getType(),
jobContext.getJobConfig().getPipelineConfig().getSource().getParameter());
checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes,
sourceConfig.getDatabaseType().getName());
JDBCDataSourceConfiguration targetConfig =
JDBCDataSourceConfigurationFactory.newInstance(
-
jobContext.getJobConfig().getRuleConfig().getTarget().getType(),
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
+
jobContext.getJobConfig().getPipelineConfig().getTarget().getType(),
jobContext.getJobConfig().getPipelineConfig().getTarget().getParameter());
checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes,
targetConfig.getDatabaseType().getName());
Collection<String> logicTableNames =
jobContext.getTaskConfigs().stream().flatMap(each ->
each.getDumperConfig().getTableNameMap().values().stream()).distinct().collect(Collectors.toList());
Map<String, TableMetaData> tableMetaDataMap =
getTablesColumnsMap(sourceConfig, logicTableNames);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index a52d4a2..a31b28c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
import
org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceFactory;
@@ -50,12 +50,12 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
private final DataSourceFactory dataSourceFactory = new
DataSourceFactory();
- protected final DataSourceWrapper getSourceDataSource(final
RuleConfiguration ruleConfig) {
- return
dataSourceFactory.newInstance(JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getSource().getType(),
ruleConfig.getSource().getParameter()));
+ protected final DataSourceWrapper getSourceDataSource(final
PipelineConfiguration pipelineConfig) {
+ return
dataSourceFactory.newInstance(JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(),
pipelineConfig.getSource().getParameter()));
}
- protected final DataSourceWrapper getTargetDataSource(final
RuleConfiguration ruleConfig) {
- return
dataSourceFactory.newInstance(JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getTarget().getType(),
ruleConfig.getTarget().getParameter()));
+ protected final DataSourceWrapper getTargetDataSource(final
PipelineConfiguration pipelineConfig) {
+ return
dataSourceFactory.newInstance(JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(),
pipelineConfig.getTarget().getParameter()));
}
protected final void executeTargetTableSQL(final Connection
targetConnection, final String sql) throws SQLException {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index b72bead..7271bfa 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -79,7 +79,7 @@ public final class RuleAlteredJobPreparer {
return;
}
JobDataNodeLine tablesFirstDataNodes =
JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
- PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getRuleConfig());
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(tablesFirstDataNodes,
jobConfig.getPipelineConfig());
dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 2df03b1..5b848a5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
@@ -152,12 +152,12 @@ public final class RuleAlteredJobWorker {
*/
private static YamlRootConfiguration getYamlRootConfig(final
JobConfiguration jobConfig) {
JDBCDataSourceConfiguration targetDataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
- jobConfig.getRuleConfig().getTarget().getType(),
jobConfig.getRuleConfig().getTarget().getParameter());
+ jobConfig.getPipelineConfig().getTarget().getType(),
jobConfig.getPipelineConfig().getTarget().getParameter());
if (targetDataSourceConfig instanceof
ShardingSphereJDBCDataSourceConfiguration) {
return ((ShardingSphereJDBCDataSourceConfiguration)
targetDataSourceConfig).getRootConfig();
}
JDBCDataSourceConfiguration sourceDataSourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(
- jobConfig.getRuleConfig().getSource().getType(),
jobConfig.getRuleConfig().getSource().getParameter());
+ jobConfig.getPipelineConfig().getSource().getType(),
jobConfig.getPipelineConfig().getSource().getParameter());
return ((ShardingSphereJDBCDataSourceConfiguration)
sourceDataSourceConfig).getRootConfig();
}
@@ -204,8 +204,8 @@ public final class RuleAlteredJobWorker {
throw new PipelineJobCreationException("more than 1 rule altered");
}
WorkflowConfiguration workflowConfig = new
WorkflowConfiguration(event.getSchemaName(), new
ArrayList<>(alteredRuleYamlClassNames), event.getRuleCacheId());
- RuleConfiguration ruleConfig = getRuleConfiguration(sourceRootConfig,
targetRootConfig);
- return Optional.of(new JobConfiguration(workflowConfig, ruleConfig));
+ PipelineConfiguration pipelineConfig =
getPipelineConfiguration(sourceRootConfig, targetRootConfig);
+ return Optional.of(new JobConfiguration(workflowConfig,
pipelineConfig));
}
private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>>
groupSourceTargetRuleConfigsByType(
@@ -225,8 +225,8 @@ public final class RuleAlteredJobWorker {
return result;
}
- private RuleConfiguration getRuleConfiguration(final YamlRootConfiguration
sourceRootConfig, final YamlRootConfiguration targetRootConfig) {
- RuleConfiguration result = new RuleConfiguration();
+ private PipelineConfiguration getPipelineConfiguration(final
YamlRootConfiguration sourceRootConfig, final YamlRootConfiguration
targetRootConfig) {
+ PipelineConfiguration result = new PipelineConfiguration();
result.setSource(createYamlJDBCDataSourceConfiguration(sourceRootConfig));
result.setTarget(createYamlJDBCDataSourceConfiguration(targetRootConfig));
return result;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index 46294d4..90566b2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.spi.required.RequiredSPI;
@@ -33,17 +33,17 @@ public interface RuleAlteredJobConfigurationPreparer
extends RequiredSPI {
/**
* Create handle configuration, used to build job configuration.
*
- * @param ruleConfig rule configuration
+ * @param pipelineConfig pipeline configuration
* @return handle configuration
*/
- HandleConfiguration createHandleConfiguration(RuleConfiguration
ruleConfig);
+ HandleConfiguration createHandleConfiguration(PipelineConfiguration
pipelineConfig);
/**
* Create task configurations, used by underlying scheduler.
*
- * @param ruleConfig rule configuration
+ * @param pipelineConfig pipeline configuration
* @param handleConfig handle configuration
* @return task configurations
*/
- Collection<TaskConfiguration> createTaskConfigurations(RuleConfiguration
ruleConfig, HandleConfiguration handleConfig);
+ Collection<TaskConfiguration>
createTaskConfigurations(PipelineConfiguration pipelineConfig,
HandleConfiguration handleConfig);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index d0d9d8e..7c4c16c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -45,8 +45,9 @@ public final class ScalingEnvironmentManager {
// TODO seems it should be removed, dangerous to use
public void resetTargetTable(final RuleAlteredJobContext jobContext)
throws SQLException {
Collection<String> tables =
jobContext.getTaskConfigs().stream().flatMap(each ->
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet());
- try (DataSourceWrapper dataSource = dataSourceFactory.newInstance(
-
JDBCDataSourceConfigurationFactory.newInstance(jobContext.getJobConfig().getRuleConfig().getTarget().getType(),
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter()));
+ try (DataSourceWrapper dataSource =
dataSourceFactory.newInstance(JDBCDataSourceConfigurationFactory.newInstance(
+
jobContext.getJobConfig().getPipelineConfig().getTarget().getType(),
+
jobContext.getJobConfig().getPipelineConfig().getTarget().getParameter()));
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
String sql =
ScalingSQLBuilderFactory.newInstance(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index a325994..ced10ab 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
@@ -44,10 +44,10 @@ public final class MySQLDataSourcePreparer extends
AbstractDataSourcePreparer {
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
- RuleConfiguration ruleConfig = parameter.getRuleConfig();
- try (DataSourceWrapper sourceDataSource =
getSourceDataSource(ruleConfig);
+ PipelineConfiguration pipelineConfig =
parameter.getPipelineConfiguration();
+ try (DataSourceWrapper sourceDataSource =
getSourceDataSource(pipelineConfig);
Connection sourceConnection = sourceDataSource.getConnection();
- DataSourceWrapper targetDataSource =
getTargetDataSource(ruleConfig);
+ DataSourceWrapper targetDataSource =
getTargetDataSource(pipelineConfig);
Connection targetConnection = targetDataSource.getConnection()) {
Collection<String> logicTableNames =
parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
for (String each : logicTableNames) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 5332675..04db249 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
@@ -46,7 +46,7 @@ public final class MySQLDataSourcePreparerTest {
private PrepareTargetTablesParameter prepareTargetTablesParameter;
@Mock
- private RuleConfiguration ruleConfig;
+ private PipelineConfiguration pipelineConfig;
@Mock
private YamlJDBCDataSourceConfiguration
sourceYamlJDBCDataSourceConfiguration;
@@ -68,12 +68,12 @@ public final class MySQLDataSourcePreparerTest {
@Before
public void setUp() throws SQLException {
-
when(prepareTargetTablesParameter.getRuleConfig()).thenReturn(ruleConfig);
+
when(prepareTargetTablesParameter.getPipelineConfiguration()).thenReturn(pipelineConfig);
when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new
JobDataNodeLine(Collections.emptyList()));
-
when(ruleConfig.getSource()).thenReturn(sourceYamlJDBCDataSourceConfiguration);
+
when(pipelineConfig.getSource()).thenReturn(sourceYamlJDBCDataSourceConfiguration);
when(JDBCDataSourceCreatorFactory.getInstance(
sourceScalingDataSourceConfig.getType()).createDataSource(sourceScalingDataSourceConfig.getDataSourceConfiguration())).thenReturn(sourceDataSource);
-
when(ruleConfig.getTarget()).thenReturn(targetYamlJDBCDataSourceConfiguration);
+
when(pipelineConfig.getTarget()).thenReturn(targetYamlJDBCDataSourceConfiguration);
when(JDBCDataSourceCreatorFactory.getInstance(
targetScalingDataSourceConfig.getType()).createDataSource(targetScalingDataSourceConfig.getDataSourceConfiguration())).thenReturn(targetDataSource);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index bf91da9..64c86c3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -60,7 +60,7 @@ public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePrepare
throw new PipelineJobPrepareFailedException("get table definitions
failed.", ex);
}
Map<String, Collection<String>> createLogicTableSQLs =
getCreateLogicTableSQLs(actualTableDefinitions);
- try (DataSourceWrapper targetDataSource =
getTargetDataSource(parameter.getRuleConfig());
+ try (DataSourceWrapper targetDataSource =
getTargetDataSource(parameter.getPipelineConfiguration());
Connection targetConnection = targetDataSource.getConnection()) {
for (Entry<String, Collection<String>> entry :
createLogicTableSQLs.entrySet()) {
for (String each : entry.getValue()) {
@@ -76,7 +76,7 @@ public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePrepare
private Collection<ActualTableDefinition> getActualTableDefinitions(final
PrepareTargetTablesParameter parameter) throws SQLException {
Collection<ActualTableDefinition> result = new ArrayList<>();
ShardingSphereJDBCDataSourceConfiguration sourceConfig =
(ShardingSphereJDBCDataSourceConfiguration)
JDBCDataSourceConfigurationFactory.newInstance(
- parameter.getRuleConfig().getSource().getType(),
parameter.getRuleConfig().getSource().getParameter());
+ parameter.getPipelineConfiguration().getSource().getType(),
parameter.getPipelineConfiguration().getSource().getParameter());
try (DataSourceManager dataSourceManager = new DataSourceManager()) {
for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
DataNode dataNode = each.getDataNodes().get(0);
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
index 7aabfd3..6b122d1 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
@@ -21,7 +21,7 @@ import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.yaml.YamlJDBCDataSourceConfiguration;
import
org.apache.shardingsphere.integration.scaling.test.mysql.env.cases.DataSet;
@@ -81,11 +81,11 @@ public final class ITEnvironmentContext {
}
private static String createScalingConfiguration(final Map<String,
YamlTableRuleConfiguration> tableRules) {
- RuleConfiguration ruleConfig = new RuleConfiguration();
-
ruleConfig.setSource(createYamlJDBCDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
-
ruleConfig.setTarget(createYamlJDBCDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
+ PipelineConfiguration pipelineConfig = new PipelineConfiguration();
+
pipelineConfig.setSource(createYamlJDBCDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
+
pipelineConfig.setTarget(createYamlJDBCDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setRuleConfig(ruleConfig);
+ jobConfig.setPipelineConfig(pipelineConfig);
return new Gson().toJson(jobConfig);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index 59cb3e9..026e108 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -139,7 +139,7 @@ public final class PipelineJobAPIImplTest {
Optional<Long> jobId =
pipelineJobAPI.start(ResourceUtil.mockJobConfig());
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
- initTableData(jobConfig.getRuleConfig());
+ initTableData(jobConfig.getPipelineConfig());
Map<String, DataConsistencyCheckResult> checkResultMap =
pipelineJobAPI.dataConsistencyCheck(jobId.get());
assertThat(checkResultMap.size(), is(1));
}
@@ -149,7 +149,7 @@ public final class PipelineJobAPIImplTest {
Optional<Long> jobId =
pipelineJobAPI.start(ResourceUtil.mockJobConfig());
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
- initTableData(jobConfig.getRuleConfig());
+ initTableData(jobConfig.getPipelineConfig());
Map<String, DataConsistencyCheckResult> checkResultMap =
pipelineJobAPI.dataConsistencyCheck(jobId.get(),
FixtureDataConsistencyCheckAlgorithm.TYPE);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
@@ -185,17 +185,17 @@ public final class PipelineJobAPIImplTest {
Optional<Long> jobId =
pipelineJobAPI.start(ResourceUtil.mockJobConfig());
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
- initTableData(jobConfig.getRuleConfig());
+ initTableData(jobConfig.getPipelineConfig());
pipelineJobAPI.reset(jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap =
pipelineJobAPI.dataConsistencyCheck(jobId.get(),
FixtureDataConsistencyCheckAlgorithm.TYPE);
assertThat(checkResultMap.get("t_order").getTargetRecordsCount(),
is(0L));
}
@SneakyThrows(SQLException.class)
- private void initTableData(final RuleConfiguration ruleConfig) {
- JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getSource().getType(),
ruleConfig.getSource().getParameter());
+ private void initTableData(final PipelineConfiguration pipelineConfig) {
+ JDBCDataSourceConfiguration sourceConfig =
JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(),
pipelineConfig.getSource().getParameter());
initTableData(JDBCDataSourceCreatorFactory.getInstance(sourceConfig.getType()).createDataSource(sourceConfig.getDataSourceConfiguration()));
- JDBCDataSourceConfiguration targetConfig =
JDBCDataSourceConfigurationFactory.newInstance(ruleConfig.getTarget().getType(),
ruleConfig.getTarget().getParameter());
+ JDBCDataSourceConfiguration targetConfig =
JDBCDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(),
pipelineConfig.getTarget().getParameter());
initTableData(JDBCDataSourceCreatorFactory.getInstance(targetConfig.getType()).createDataSource(targetConfig.getDataSourceConfiguration()));
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManagerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManagerTest.java
index 7b874a4..a561826 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManagerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManagerTest.java
@@ -46,7 +46,7 @@ public final class DataSourceManagerTest {
public void assertGetDataSource() {
DataSourceManager dataSourceManager = new DataSourceManager();
DataSource actual = dataSourceManager.getDataSource(
-
JDBCDataSourceConfigurationFactory.newInstance(jobConfig.getRuleConfig().getSource().getType(),
jobConfig.getRuleConfig().getSource().getParameter()));
+
JDBCDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(),
jobConfig.getPipelineConfig().getSource().getParameter()));
assertThat(actual, instanceOf(DataSourceWrapper.class));
}
@@ -54,9 +54,9 @@ public final class DataSourceManagerTest {
public void assertClose() throws NoSuchFieldException,
IllegalAccessException {
try (DataSourceManager dataSourceManager = new DataSourceManager()) {
dataSourceManager.createSourceDataSource(
-
JDBCDataSourceConfigurationFactory.newInstance(jobConfig.getRuleConfig().getSource().getType(),
jobConfig.getRuleConfig().getSource().getParameter()));
+
JDBCDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(),
jobConfig.getPipelineConfig().getSource().getParameter()));
dataSourceManager.createTargetDataSource(
-
JDBCDataSourceConfigurationFactory.newInstance(jobConfig.getRuleConfig().getTarget().getType(),
jobConfig.getRuleConfig().getTarget().getParameter()));
+
JDBCDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getTarget().getType(),
jobConfig.getPipelineConfig().getTarget().getParameter()));
Map<?, ?> cachedDataSources =
ReflectionUtil.getFieldValue(dataSourceManager, "cachedDataSources", Map.class);
assertNotNull(cachedDataSources);
assertThat(cachedDataSources.size(), is(2));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
index 1aeab5e..285f2dc 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.ShardingSphereJDBCDataSourceConfiguration;
@@ -61,10 +61,10 @@ public final class ResourceUtil {
JobConfiguration result = new JobConfiguration();
WorkflowConfiguration workflowConfig = new
WorkflowConfiguration("logic_db",
Collections.singletonList(YamlShardingRuleConfiguration.class.getName()),
"id1");
result.setWorkflowConfig(workflowConfig);
- RuleConfiguration ruleConfig = new RuleConfiguration();
- result.setRuleConfig(ruleConfig);
- ruleConfig.setSource(createYamlJDBCDataSourceConfiguration(new
ShardingSphereJDBCDataSourceConfiguration(readFileToString("/config_sharding_sphere_jdbc_source.yaml"))));
- ruleConfig.setTarget(createYamlJDBCDataSourceConfiguration(new
StandardJDBCDataSourceConfiguration(readFileToString("/config_standard_jdbc_target.yaml"))));
+ PipelineConfiguration pipelineConfig = new PipelineConfiguration();
+ result.setPipelineConfig(pipelineConfig);
+ pipelineConfig.setSource(createYamlJDBCDataSourceConfiguration(new
ShardingSphereJDBCDataSourceConfiguration(readFileToString("/config_sharding_sphere_jdbc_source.yaml"))));
+ pipelineConfig.setTarget(createYamlJDBCDataSourceConfiguration(new
StandardJDBCDataSourceConfiguration(readFileToString("/config_standard_jdbc_target.yaml"))));
result.buildHandleConfig();
return result;
}