Lucas-307 commented on a change in pull request #6883:
URL: https://github.com/apache/shardingsphere/pull/6883#discussion_r471204010
##########
File path:
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
##########
@@ -44,79 +45,79 @@
* Synchronize position resumer.
*/
public final class SyncPositionResumer {
-
+
private final SyncTaskFactory syncTaskFactory = new
DefaultSyncTaskFactory();
-
+
/**
* Resume position from resume from break-point manager.
*
- * @param shardingScalingJob sharding scaling job
- * @param dataSourceManager dataSource manager
+ * @param shardingScalingJob sharding scaling job
+ * @param dataSourceManager dataSource manager
* @param resumeBreakPointManager resume from break-point manager
*/
public void resumePosition(final ShardingScalingJob shardingScalingJob,
final DataSourceManager dataSourceManager, final ResumeBreakPointManager
resumeBreakPointManager) {
resumeInventoryPosition(shardingScalingJob, dataSourceManager,
resumeBreakPointManager);
resumeIncrementalPosition(shardingScalingJob, resumeBreakPointManager);
}
-
+
private void resumeInventoryPosition(final ShardingScalingJob
shardingScalingJob, final DataSourceManager dataSourceManager, final
ResumeBreakPointManager resumeBreakPointManager) {
List<ScalingTask<InventoryPosition>> allInventoryDataTasks =
getAllInventoryDataTasks(shardingScalingJob, dataSourceManager,
resumeBreakPointManager);
for (Collection<ScalingTask<InventoryPosition>> each :
JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(),
allInventoryDataTasks)) {
shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
}
}
-
+
private List<ScalingTask<InventoryPosition>> getAllInventoryDataTasks(
final ShardingScalingJob shardingScalingJob, final
DataSourceManager dataSourceManager, final ResumeBreakPointManager
resumeBreakPointManager) {
List<ScalingTask<InventoryPosition>> result = new LinkedList<>();
for (SyncConfiguration each :
shardingScalingJob.getSyncConfigurations()) {
MetaDataManager metaDataManager = new
MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
for (Entry<String, PositionManager<InventoryPosition>> entry :
getInventoryPositionMap(each.getDumperConfiguration(),
resumeBreakPointManager).entrySet()) {
-
result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each,
metaDataManager, entry)));
+
result.add(syncTaskFactory.createInventoryDataSyncTask(newInventoryDumperConfiguration(each.getDumperConfiguration(),
metaDataManager, entry), each.getImporterConfiguration()));
}
}
return result;
}
-
- private SyncConfiguration newSyncConfiguration(final SyncConfiguration
syncConfiguration, final MetaDataManager metaDataManager, final Entry<String,
PositionManager<InventoryPosition>> entry) {
+
+ private InventoryDumperConfiguration newInventoryDumperConfiguration(final
DumperConfiguration dumperConfiguration, final MetaDataManager metaDataManager,
+ final
Entry<String, PositionManager<InventoryPosition>> entry) {
String[] splitTable = entry.getKey().split("#");
- RdbmsConfiguration splitDumperConfig =
RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
+ InventoryDumperConfiguration splitDumperConfig = new
InventoryDumperConfiguration(dumperConfiguration);
splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
splitDumperConfig.setPositionManager(entry.getValue());
if (2 == splitTable.length) {
splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1]));
}
splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0));
- return new SyncConfiguration(syncConfiguration.getConcurrency(),
syncConfiguration.getTableNameMap(),
- splitDumperConfig,
RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
+ return splitDumperConfig;
}
-
+
private Map<String, PositionManager<InventoryPosition>>
getInventoryPositionMap(
- final RdbmsConfiguration dumperConfiguration, final
ResumeBreakPointManager resumeBreakPointManager) {
+ final DumperConfiguration dumperConfiguration, final
ResumeBreakPointManager resumeBreakPointManager) {
Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?",
dumperConfiguration.getDataSourceName()));
return
resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
.filter(entry -> pattern.matcher(entry.getKey()).find())
.collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue));
}
-
+
private void resumeIncrementalPosition(final ShardingScalingJob
shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
for (SyncConfiguration each :
shardingScalingJob.getSyncConfigurations()) {
each.getDumperConfiguration().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
-
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
+
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(),
each.getDumperConfiguration(), each.getImporterConfiguration()));
}
}
-
+
/**
* Persist position.
*
- * @param shardingScalingJob sharding scaling job
+ * @param shardingScalingJob sharding scaling job
Review comment:
parameter do not have to align.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]