This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fc3699caa11 Remove jobConfig from TaskConfiguration (#19751)
fc3699caa11 is described below

commit fc3699caa11ade2017a83879c49469d1803f0ab5
Author: Da Xiang Huang <[email protected]>
AuthorDate: Mon Aug 1 19:33:18 2022 +0800

    Remove jobConfig from TaskConfiguration (#19751)
---
 .../data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java   | 5 +++--
 .../data/pipeline/api/config/rulealtered/ImporterConfiguration.java  | 2 ++
 .../data/pipeline/api/config/rulealtered/TaskConfiguration.java      | 2 --
 .../data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java   | 4 ++--
 .../data/pipeline/core/importer/AbstractImporterTest.java            | 2 +-
 5 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index abbba0e395e..51187ddb8ae 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -172,7 +172,7 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         Set<LogicTableName> reShardNeededTables = 
jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig), 
reShardNeededTables);
         ImporterConfiguration importerConfig = 
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
-        TaskConfiguration result = new TaskConfiguration(jobConfig, 
dumperConfig, importerConfig);
+        TaskConfiguration result = new TaskConfiguration(dumperConfig, 
importerConfig);
         log.info("createTaskConfiguration, dataSourceName={}, result={}", 
dataSourceName, result);
         return result;
     }
@@ -242,7 +242,8 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
 jobConfig.getTarget().getParameter());
         int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
-        return new ImporterConfiguration(dataSourceConfig, 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
retryTimes);
+        int concurrency = jobConfig.getConcurrency();
+        return new ImporterConfiguration(dataSourceConfig, 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
retryTimes, concurrency);
     }
     
     private static Map<LogicTableName, Set<String>> unmodifiable(final 
Map<LogicTableName, Set<String>> shardingColumnsMap) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
index af26e11e7cd..ee03e5d0c36 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
@@ -52,6 +52,8 @@ public final class ImporterConfiguration {
     
     private final int retryTimes;
     
+    private final int concurrency;
+    
     /**
      * Get logic table names.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
index e2f6e10b356..5356a162b6e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
@@ -30,8 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 @ToString
 public final class TaskConfiguration {
     
-    private final RuleAlteredJobConfiguration jobConfig;
-    
     private final DumperConfiguration dumperConfig;
     
     private final ImporterConfiguration importerConfig;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 03404707be8..7c0f1716a4a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -194,7 +194,7 @@ public final class RuleAlteredJobPreparer {
         
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, 
taskConfig, dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobContext.getSourceMetaDataLoader();
         AsyncPipelineJobPersistCallback persistCallback = new 
AsyncPipelineJobPersistCallback(jobContext.getJobId(), 
jobContext.getShardingItem());
-        IncrementalTask incrementalTask = new 
IncrementalTask(taskConfig.getJobConfig().getConcurrency(),
+        IncrementalTask incrementalTask = new 
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
                 taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), 
pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, 
incrementalDumperExecuteEngine, persistCallback);
         jobContext.getIncrementalTasks().add(incrementalTask);
     }
@@ -207,7 +207,7 @@ public final class RuleAlteredJobPreparer {
                 return position.get();
             }
         }
-        String databaseType = 
taskConfig.getJobConfig().getSourceDatabaseType();
+        String databaseType = 
taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType().getType();
         DataSource dataSource = 
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
         return 
PositionInitializerFactory.getInstance(databaseType).init(dataSource);
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 14406182f8b..22b9c7c2b70 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -177,6 +177,6 @@ public final class AbstractImporterTest {
     
     private ImporterConfiguration mockImporterConfiguration() {
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("test_table"), 
Collections.singleton("user"));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3);
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
     }
 }

Reply via email to