This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fc3699caa11 Remove jobConfig from TaskConfiguration (#19751)
fc3699caa11 is described below
commit fc3699caa11ade2017a83879c49469d1803f0ab5
Author: Da Xiang Huang <[email protected]>
AuthorDate: Mon Aug 1 19:33:18 2022 +0800
Remove jobConfig from TaskConfiguration (#19751)
---
.../data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java | 5 +++--
.../data/pipeline/api/config/rulealtered/ImporterConfiguration.java | 2 ++
.../data/pipeline/api/config/rulealtered/TaskConfiguration.java | 2 --
.../data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java | 4 ++--
.../data/pipeline/core/importer/AbstractImporterTest.java | 2 +-
5 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index abbba0e395e..51187ddb8ae 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -172,7 +172,7 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
Set<LogicTableName> reShardNeededTables =
jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap =
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig),
reShardNeededTables);
ImporterConfiguration importerConfig =
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
- TaskConfiguration result = new TaskConfiguration(jobConfig,
dumperConfig, importerConfig);
+ TaskConfiguration result = new TaskConfiguration(dumperConfig,
importerConfig);
log.info("createTaskConfiguration, dataSourceName={}, result={}",
dataSourceName, result);
return result;
}
@@ -242,7 +242,8 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
- return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
retryTimes);
+ int concurrency = jobConfig.getConcurrency();
+ return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
retryTimes, concurrency);
}
private static Map<LogicTableName, Set<String>> unmodifiable(final
Map<LogicTableName, Set<String>> shardingColumnsMap) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
index af26e11e7cd..ee03e5d0c36 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
@@ -52,6 +52,8 @@ public final class ImporterConfiguration {
private final int retryTimes;
+ private final int concurrency;
+
/**
* Get logic table names.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
index e2f6e10b356..5356a162b6e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
@@ -30,8 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
@ToString
public final class TaskConfiguration {
- private final RuleAlteredJobConfiguration jobConfig;
-
private final DumperConfiguration dumperConfig;
private final ImporterConfiguration importerConfig;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 03404707be8..7c0f1716a4a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -194,7 +194,7 @@ public final class RuleAlteredJobPreparer {
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext,
taskConfig, dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobContext.getSourceMetaDataLoader();
AsyncPipelineJobPersistCallback persistCallback = new
AsyncPipelineJobPersistCallback(jobContext.getJobId(),
jobContext.getShardingItem());
- IncrementalTask incrementalTask = new
IncrementalTask(taskConfig.getJobConfig().getConcurrency(),
+ IncrementalTask incrementalTask = new
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader,
incrementalDumperExecuteEngine, persistCallback);
jobContext.getIncrementalTasks().add(incrementalTask);
}
@@ -207,7 +207,7 @@ public final class RuleAlteredJobPreparer {
return position.get();
}
}
- String databaseType =
taskConfig.getJobConfig().getSourceDatabaseType();
+ String databaseType =
taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource =
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
return
PositionInitializerFactory.getInstance(databaseType).init(dataSource);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 14406182f8b..22b9c7c2b70 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -177,6 +177,6 @@ public final class AbstractImporterTest {
private ImporterConfiguration mockImporterConfiguration() {
Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("test_table"),
Collections.singleton("user"));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3);
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
}
}