Lucas-307 commented on a change in pull request #6883:
URL: https://github.com/apache/shardingsphere/pull/6883#discussion_r471204010



##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
##########
@@ -44,79 +45,79 @@
  * Synchronize position resumer.
  */
 public final class SyncPositionResumer {
-    
+
     private final SyncTaskFactory syncTaskFactory = new 
DefaultSyncTaskFactory();
-    
+
     /**
      * Resume position from resume from break-point manager.
      *
-     * @param shardingScalingJob sharding scaling job
-     * @param dataSourceManager dataSource manager
+     * @param shardingScalingJob      sharding scaling job
+     * @param dataSourceManager       dataSource manager
      * @param resumeBreakPointManager resume from break-point manager
      */
     public void resumePosition(final ShardingScalingJob shardingScalingJob, 
final DataSourceManager dataSourceManager, final ResumeBreakPointManager 
resumeBreakPointManager) {
         resumeInventoryPosition(shardingScalingJob, dataSourceManager, 
resumeBreakPointManager);
         resumeIncrementalPosition(shardingScalingJob, resumeBreakPointManager);
     }
-    
+
     private void resumeInventoryPosition(final ShardingScalingJob 
shardingScalingJob, final DataSourceManager dataSourceManager, final 
ResumeBreakPointManager resumeBreakPointManager) {
         List<ScalingTask<InventoryPosition>> allInventoryDataTasks = 
getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, 
resumeBreakPointManager);
         for (Collection<ScalingTask<InventoryPosition>> each : 
JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(),
 allInventoryDataTasks)) {
             
shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
         }
     }
-    
+
     private List<ScalingTask<InventoryPosition>> getAllInventoryDataTasks(
             final ShardingScalingJob shardingScalingJob, final 
DataSourceManager dataSourceManager, final ResumeBreakPointManager 
resumeBreakPointManager) {
         List<ScalingTask<InventoryPosition>> result = new LinkedList<>();
         for (SyncConfiguration each : 
shardingScalingJob.getSyncConfigurations()) {
             MetaDataManager metaDataManager = new 
MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
             for (Entry<String, PositionManager<InventoryPosition>> entry : 
getInventoryPositionMap(each.getDumperConfiguration(), 
resumeBreakPointManager).entrySet()) {
-                
result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each,
 metaDataManager, entry)));
+                
result.add(syncTaskFactory.createInventoryDataSyncTask(newInventoryDumperConfiguration(each.getDumperConfiguration(),
 metaDataManager, entry), each.getImporterConfiguration()));
             }
         }
         return result;
     }
-    
-    private SyncConfiguration newSyncConfiguration(final SyncConfiguration 
syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, 
PositionManager<InventoryPosition>> entry) {
+
+    private InventoryDumperConfiguration newInventoryDumperConfiguration(final 
DumperConfiguration dumperConfiguration, final MetaDataManager metaDataManager,
+                                                                final 
Entry<String, PositionManager<InventoryPosition>> entry) {
         String[] splitTable = entry.getKey().split("#");
-        RdbmsConfiguration splitDumperConfig = 
RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
+        InventoryDumperConfiguration splitDumperConfig = new 
InventoryDumperConfiguration(dumperConfiguration);
         splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
         splitDumperConfig.setPositionManager(entry.getValue());
         if (2 == splitTable.length) {
             splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1]));
         }
         
splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0));
-        return new SyncConfiguration(syncConfiguration.getConcurrency(), 
syncConfiguration.getTableNameMap(),
-                splitDumperConfig, 
RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
+        return splitDumperConfig;
     }
-    
+
     private Map<String, PositionManager<InventoryPosition>> 
getInventoryPositionMap(
-            final RdbmsConfiguration dumperConfiguration, final 
ResumeBreakPointManager resumeBreakPointManager) {
+            final DumperConfiguration dumperConfiguration, final 
ResumeBreakPointManager resumeBreakPointManager) {
         Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", 
dumperConfiguration.getDataSourceName()));
         return 
resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
                 .filter(entry -> pattern.matcher(entry.getKey()).find())
                 .collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue));
     }
-    
+
     private void resumeIncrementalPosition(final ShardingScalingJob 
shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
         for (SyncConfiguration each : 
shardingScalingJob.getSyncConfigurations()) {
             
each.getDumperConfiguration().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
-            
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
+            
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(),
 each.getDumperConfiguration(), each.getImporterConfiguration()));
         }
     }
-    
+
     /**
      * Persist position.
      *
-     * @param shardingScalingJob sharding scaling job
+     * @param shardingScalingJob      sharding scaling job

Review comment:
       parameter do not have to align.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to