This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e0934fcef06 Refactor MigrationJobAPI (#29238)
e0934fcef06 is described below
commit e0934fcef0689e93523c7738444752e4a34f31cb
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 30 15:54:27 2023 +0800
Refactor MigrationJobAPI (#29238)
---
.../scenario/migration/api/MigrationJobAPI.java | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 712fcef2d63..4412612d67f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -93,9 +93,18 @@ import java.util.stream.Collectors;
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {
- private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
+ private final PipelineJobManager jobManager;
- private final PipelineJobOption jobOption = new MigrationJobOption();
+ private final PipelineJobConfigurationManager jobConfigManager;
+
+ private final PipelineDataSourcePersistService dataSourcePersistService;
+
+ public MigrationJobAPI() {
+ PipelineJobOption jobOption = new MigrationJobOption();
+ jobManager = new PipelineJobManager(jobOption);
+ jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+ dataSourcePersistService = new PipelineDataSourcePersistService();
+ }
/**
* Start migration job.
@@ -106,7 +115,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
*/
public String start(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
- new PipelineJobManager(jobOption).start(jobConfig);
+ jobManager.start(jobConfig);
return jobConfig.getJobId();
}
@@ -269,10 +278,9 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
public void commit(final String jobId) {
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
- PipelineJobManager jobManager = new PipelineJobManager(jobOption);
jobManager.stop(jobId);
dropCheckJobs(jobId);
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig =
jobConfigManager.getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
@@ -290,7 +298,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
- new PipelineJobManager(jobOption).drop(jobId);
+ jobManager.drop(jobId);
log.info("Rollback job {} cost {} ms", jobId,
System.currentTimeMillis() - startTimeMillis);
}
@@ -301,7 +309,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
for (String each : checkJobIds) {
try {
- new PipelineJobManager(jobOption).drop(each);
+ jobManager.drop(each);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON