This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fbbb8822034 Merge MigrationJobAPI and MigrationJobManager (#29218)
fbbb8822034 is described below
commit fbbb88220342ca0b1a5da1e22681f2b2f6aa11ee
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 27 21:54:09 2023 +0800
Merge MigrationJobAPI and MigrationJobManager (#29218)
---
.../ShowMigrationSourceStorageUnitsExecutor.java | 9 +-
.../handler/update/MigrateTableUpdater.java | 11 +-
.../RegisterMigrationSourceStorageUnitUpdater.java | 9 +-
...nregisterMigrationSourceStorageUnitUpdater.java | 9 +-
.../migration/api/impl/MigrationJobAPI.java | 226 +++++++++++++++++-
.../migration/api/impl/MigrationJobManager.java | 261 ---------------------
.../migration/api/impl/MigrationJobAPITest.java | 20 +-
7 files changed, 251 insertions(+), 294 deletions(-)
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index bab525e6822..ca2d229bb4b 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -18,11 +18,12 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
import java.util.Arrays;
@@ -36,11 +37,11 @@ import java.util.List;
*/
public final class ShowMigrationSourceStorageUnitsExecutor implements
QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> {
- private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
+ private final MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationSourceStorageUnitsStatement sqlStatement) {
- Iterator<Collection<Object>> data =
jobManager.listMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY)).iterator();
+ Iterator<Collection<Object>> data =
jobAPI.listMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY)).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>)
data.next()));
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index 23f220d73fb..3d0214752d2 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -17,29 +17,28 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
/**
* Migrate table updater.
*/
-@Slf4j
public final class MigrateTableUpdater implements
RALUpdater<MigrateTableStatement> {
- private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
+ private final MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
@Override
public void executeUpdate(final String databaseName, final
MigrateTableStatement sqlStatement) {
String targetDatabaseName = null ==
sqlStatement.getTargetDatabaseName() ? databaseName :
sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName,
MissingRequiredTargetDatabaseException::new);
- jobManager.start(new PipelineContextKey(InstanceType.PROXY), new
MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
+ jobAPI.start(new PipelineContextKey(InstanceType.PROXY), new
MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index d26fee0d0a1..346d76ff1c0 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePo
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.migration.distsql.statement.RegisterMigrationSourceStorageUnitStatement;
import java.util.ArrayList;
@@ -43,7 +44,7 @@ import java.util.Map;
*/
public final class RegisterMigrationSourceStorageUnitUpdater implements
RALUpdater<RegisterMigrationSourceStorageUnitStatement> {
- private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
+ private final MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
private final DataSourcePoolPropertiesValidateHandler validateHandler =
new DataSourcePoolPropertiesValidateHandler();
@@ -56,7 +57,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater
implements RALUpdat
DatabaseType databaseType =
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap =
DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
- jobManager.addMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), propsMap);
+ jobAPI.addMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), propsMap);
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index 3f4cdfe20d2..77d6f4e3b34 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -18,10 +18,11 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
/**
@@ -29,11 +30,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigration
*/
public final class UnregisterMigrationSourceStorageUnitUpdater implements
RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {
- private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
+ private final MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
@Override
public void executeUpdate(final String databaseName, final
UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
- jobManager.dropMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
+ jobAPI.dropMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index b89e63824d3..254aa881369 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -18,27 +18,73 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
+import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
+import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
+import
org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
import org.apache.shardingsphere.mode.manager.ContextManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
/**
* Migration job API.
@@ -46,11 +92,185 @@ import java.util.Collection;
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {
+ private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
+
+ private final PipelineJobOption jobOption = new MigrationJobOption();
+
+ /**
+ * Start migration job.
+ *
+ * @param contextKey context key
+ * @param param create migration job parameter
+ * @return job id
+ */
+ public String start(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
+ MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
+ new PipelineJobManager(jobOption).start(jobConfig);
+ return jobConfig.getJobId();
+ }
+
+ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final MigrateTableStatement param) {
+ YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
+ result.setTargetDatabaseName(param.getTargetDatabaseName());
+ Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, "MIGRATION");
+ Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
+ Map<String, YamlPipelineDataSourceConfiguration> configSources = new
LinkedHashMap<>();
+ List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
+ .thenComparing(each ->
DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
+ YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
+ for (SourceTargetEntry each : sourceTargetEntries) {
+ sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key ->
new LinkedList<>()).add(each.getSource());
+ ShardingSpherePreconditions.checkState(1 ==
sourceDataNodes.get(each.getTargetTableName()).size(),
+ () -> new PipelineInvalidParameterException("more than one
source table for " + each.getTargetTableName()));
+ String dataSourceName = each.getSource().getDataSourceName();
+ if (configSources.containsKey(dataSourceName)) {
+ continue;
+ }
+
ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
+ () -> new PipelineInvalidParameterException(dataSourceName
+ " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
+ Map<String, Object> sourceDataSourcePoolProps =
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
+ StandardPipelineDataSourceConfiguration sourceDataSourceConfig =
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
+ configSources.put(dataSourceName,
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(),
sourceDataSourceConfig.getParameter()));
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
+ if (null == each.getSource().getSchemaName() &&
dialectDatabaseMetaData.isSchemaAvailable()) {
+
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
+ }
+ DatabaseType sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType();
+ if (null == result.getSourceDatabaseType()) {
+ result.setSourceDatabaseType(sourceDatabaseType.getType());
+ } else if
(!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
+ throw new PipelineInvalidParameterException("Source storage
units have different database types");
+ }
+ }
+ result.setSources(configSources);
+ ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
+ PipelineDataSourceConfiguration targetPipelineDataSourceConfig =
buildTargetPipelineDataSourceConfiguration(targetDatabase);
+
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
targetPipelineDataSourceConfig.getParameter()));
+
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
+ List<JobDataNodeEntry> tablesFirstDataNodes =
sourceDataNodes.entrySet().stream()
+ .map(entry -> new JobDataNodeEntry(entry.getKey(),
entry.getValue().subList(0, 1))).collect(Collectors.toList());
+ result.setTargetTableNames(new
ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
+
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
+ result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
+
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
+ ((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).extendYamlJobConfiguration(contextKey, result);
+ return result;
+ }
+
+ private YamlPipelineDataSourceConfiguration
buildYamlPipelineDataSourceConfiguration(final String type, final String param)
{
+ YamlPipelineDataSourceConfiguration result = new
YamlPipelineDataSourceConfiguration();
+ result.setType(type);
+ result.setParameter(param);
+ return result;
+ }
+
+ private PipelineDataSourceConfiguration
buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase
targetDatabase) {
+ Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
+ YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
+ for (Entry<String, StorageUnit> entry :
targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
+ targetPoolProps.put(entry.getKey(),
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
+ }
+ YamlRootConfiguration targetRootConfig =
buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps,
targetDatabase.getRuleMetaData().getConfigurations());
+ return new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+ }
+
+ private YamlRootConfiguration buildYamlRootConfiguration(final String
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final
Collection<RuleConfiguration> rules) {
+ ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new
NoAnyRuleExistsException(databaseName));
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName(databaseName);
+ result.setDataSources(yamlDataSources);
+ result.setRules(new
YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
+ return result;
+ }
+
+ private Map<String, String> buildTargetTableSchemaMap(final Map<String,
List<DataNode>> sourceDataNodes) {
+ Map<String, String> result = new LinkedHashMap<>();
+ sourceDataNodes.forEach((tableName, dataNodes) ->
result.put(tableName, dataNodes.get(0).getSchemaName()));
+ return result;
+ }
+
+ /**
+ * Add migration source resources.
+ *
+ * @param contextKey context key
+ * @param propsMap data source pool properties map
+ */
+ public void addMigrationSourceResources(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
+ Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, getType());
+ Collection<String> duplicateDataSourceNames = new
HashSet<>(propsMap.size(), 1F);
+ for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
+ if (existDataSources.containsKey(entry.getKey())) {
+ duplicateDataSourceNames.add(entry.getKey());
+ }
+ }
+
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), ()
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
+ Map<String, DataSourcePoolProperties> result = new
LinkedHashMap<>(existDataSources);
+ result.putAll(propsMap);
+ dataSourcePersistService.persist(contextKey, getType(), result);
+ }
+
+ /**
+ * Drop migration source resources.
+ *
+ * @param contextKey context key
+ * @param resourceNames resource names
+ */
+ public void dropMigrationSourceResources(final PipelineContextKey
contextKey, final Collection<String> resourceNames) {
+ Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, getType());
+ List<String> noExistResources = resourceNames.stream().filter(each ->
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
+ ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), ()
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
+ for (String each : resourceNames) {
+ metaDataDataSource.remove(each);
+ }
+ dataSourcePersistService.persist(contextKey, getType(),
metaDataDataSource);
+ }
+
+ /**
+ * Query migration source resources list.
+ *
+ * @param contextKey context key
+ * @return migration source resources
+ */
+ public Collection<Collection<Object>> listMigrationSourceResources(final
PipelineContextKey contextKey) {
+ Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, getType());
+ Collection<Collection<Object>> result = new
ArrayList<>(propsMap.size());
+ for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
+ String dataSourceName = entry.getKey();
+ DataSourcePoolProperties value = entry.getValue();
+ Collection<Object> props = new LinkedList<>();
+ props.add(dataSourceName);
+ String url =
String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
+ DatabaseType databaseType = DatabaseTypeFactory.get(url);
+ props.add(databaseType.getType());
+ ConnectionProperties connectionProps =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
databaseType).parse(url, "", null);
+ props.add(connectionProps.getHostname());
+ props.add(connectionProps.getPort());
+ props.add(connectionProps.getCatalog());
+ Map<String, Object> standardProps =
value.getPoolPropertySynonyms().getStandardProperties();
+ props.add(getStandardProperty(standardProps,
"connectionTimeoutMilliseconds"));
+ props.add(getStandardProperty(standardProps,
"idleTimeoutMilliseconds"));
+ props.add(getStandardProperty(standardProps,
"maxLifetimeMilliseconds"));
+ props.add(getStandardProperty(standardProps, "maxPoolSize"));
+ props.add(getStandardProperty(standardProps, "minPoolSize"));
+ props.add(getStandardProperty(standardProps, "readOnly"));
+ Map<String, Object> otherProps =
value.getCustomProperties().getProperties();
+ props.add(otherProps.isEmpty() ? "" :
JsonUtils.toJsonString(otherProps));
+ result.add(props);
+ }
+ return result;
+ }
+
+ private String getStandardProperty(final Map<String, Object>
standardProps, final String key) {
+ if (standardProps.containsKey(key) && null != standardProps.get(key)) {
+ return standardProps.get(key).toString();
+ }
+ return "";
+ }
+
@Override
public void commit(final String jobId) {
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
- PipelineJobOption jobOption = new MigrationJobOption();
PipelineJobManager jobManager = new PipelineJobManager(jobOption);
jobManager.stop(jobId);
dropCheckJobs(jobId);
@@ -72,7 +292,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).drop(jobId);
+ new PipelineJobManager(jobOption).drop(jobId);
log.info("Rollback job {} cost {} ms", jobId,
System.currentTimeMillis() - startTimeMillis);
}
@@ -83,7 +303,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
for (String each : checkJobIds) {
try {
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).drop(each);
+ new PipelineJobManager(jobOption).drop(each);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
deleted file mode 100644
index f356ed53733..00000000000
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
-
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
-import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
-import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
-import
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
-import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
-import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
-import
org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-/**
- * Migration job manager.
- */
-@Slf4j
-public final class MigrationJobManager {
-
- private final MigrationJobOption jobOption;
-
- private final PipelineJobManager jobManager;
-
- private final PipelineDataSourcePersistService dataSourcePersistService;
-
- public MigrationJobManager(final MigrationJobOption jobOption) {
- this.jobOption = jobOption;
- jobManager = new PipelineJobManager(jobOption);
- dataSourcePersistService = new PipelineDataSourcePersistService();
- }
-
- /**
- * Start migration job.
- *
- * @param contextKey context key
- * @param param create migration job parameter
- * @return job id
- */
- public String start(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
- MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
- jobManager.start(jobConfig);
- return jobConfig.getJobId();
- }
-
- private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final MigrateTableStatement param) {
- YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
- result.setTargetDatabaseName(param.getTargetDatabaseName());
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, "MIGRATION");
- Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
- Map<String, YamlPipelineDataSourceConfiguration> configSources = new
LinkedHashMap<>();
- List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
- .thenComparing(each ->
DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
- YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
- for (SourceTargetEntry each : sourceTargetEntries) {
- sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key ->
new LinkedList<>()).add(each.getSource());
- ShardingSpherePreconditions.checkState(1 ==
sourceDataNodes.get(each.getTargetTableName()).size(),
- () -> new PipelineInvalidParameterException("more than one
source table for " + each.getTargetTableName()));
- String dataSourceName = each.getSource().getDataSourceName();
- if (configSources.containsKey(dataSourceName)) {
- continue;
- }
-
ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
- () -> new PipelineInvalidParameterException(dataSourceName
+ " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
- Map<String, Object> sourceDataSourcePoolProps =
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
- StandardPipelineDataSourceConfiguration sourceDataSourceConfig =
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
- configSources.put(dataSourceName,
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(),
sourceDataSourceConfig.getParameter()));
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
- if (null == each.getSource().getSchemaName() &&
dialectDatabaseMetaData.isSchemaAvailable()) {
-
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
- }
- DatabaseType sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType();
- if (null == result.getSourceDatabaseType()) {
- result.setSourceDatabaseType(sourceDatabaseType.getType());
- } else if
(!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
- throw new PipelineInvalidParameterException("Source storage
units have different database types");
- }
- }
- result.setSources(configSources);
- ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
- PipelineDataSourceConfiguration targetPipelineDataSourceConfig =
buildTargetPipelineDataSourceConfiguration(targetDatabase);
-
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
targetPipelineDataSourceConfig.getParameter()));
-
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
- List<JobDataNodeEntry> tablesFirstDataNodes =
sourceDataNodes.entrySet().stream()
- .map(entry -> new JobDataNodeEntry(entry.getKey(),
entry.getValue().subList(0, 1))).collect(Collectors.toList());
- result.setTargetTableNames(new
ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
-
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
- result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
-
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- jobOption.extendYamlJobConfiguration(contextKey, result);
- return result;
- }
-
- private YamlPipelineDataSourceConfiguration
buildYamlPipelineDataSourceConfiguration(final String type, final String param)
{
- YamlPipelineDataSourceConfiguration result = new
YamlPipelineDataSourceConfiguration();
- result.setType(type);
- result.setParameter(param);
- return result;
- }
-
- private PipelineDataSourceConfiguration
buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase
targetDatabase) {
- Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
- YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
- for (Entry<String, StorageUnit> entry :
targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
- targetPoolProps.put(entry.getKey(),
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
- }
- YamlRootConfiguration targetRootConfig =
buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps,
targetDatabase.getRuleMetaData().getConfigurations());
- return new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
- }
-
- private YamlRootConfiguration buildYamlRootConfiguration(final String
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final
Collection<RuleConfiguration> rules) {
- ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new
NoAnyRuleExistsException(databaseName));
- YamlRootConfiguration result = new YamlRootConfiguration();
- result.setDatabaseName(databaseName);
- result.setDataSources(yamlDataSources);
- result.setRules(new
YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
- return result;
- }
-
- private Map<String, String> buildTargetTableSchemaMap(final Map<String,
List<DataNode>> sourceDataNodes) {
- Map<String, String> result = new LinkedHashMap<>();
- sourceDataNodes.forEach((tableName, dataNodes) ->
result.put(tableName, dataNodes.get(0).getSchemaName()));
- return result;
- }
-
- /**
- * Add migration source resources.
- *
- * @param contextKey context key
- * @param propsMap data source pool properties map
- */
- public void addMigrationSourceResources(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
- Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, jobOption.getType());
- Collection<String> duplicateDataSourceNames = new
HashSet<>(propsMap.size(), 1F);
- for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
- if (existDataSources.containsKey(entry.getKey())) {
- duplicateDataSourceNames.add(entry.getKey());
- }
- }
-
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), ()
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
- Map<String, DataSourcePoolProperties> result = new
LinkedHashMap<>(existDataSources);
- result.putAll(propsMap);
- dataSourcePersistService.persist(contextKey, jobOption.getType(),
result);
- }
-
- /**
- * Drop migration source resources.
- *
- * @param contextKey context key
- * @param resourceNames resource names
- */
- public void dropMigrationSourceResources(final PipelineContextKey
contextKey, final Collection<String> resourceNames) {
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, jobOption.getType());
- List<String> noExistResources = resourceNames.stream().filter(each ->
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
- ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), ()
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
- for (String each : resourceNames) {
- metaDataDataSource.remove(each);
- }
- dataSourcePersistService.persist(contextKey, jobOption.getType(),
metaDataDataSource);
- }
-
- /**
- * Query migration source resources list.
- *
- * @param contextKey context key
- * @return migration source resources
- */
- public Collection<Collection<Object>> listMigrationSourceResources(final
PipelineContextKey contextKey) {
- Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, jobOption.getType());
- Collection<Collection<Object>> result = new
ArrayList<>(propsMap.size());
- for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
- String dataSourceName = entry.getKey();
- DataSourcePoolProperties value = entry.getValue();
- Collection<Object> props = new LinkedList<>();
- props.add(dataSourceName);
- String url =
String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
- DatabaseType databaseType = DatabaseTypeFactory.get(url);
- props.add(databaseType.getType());
- ConnectionProperties connectionProps =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
databaseType).parse(url, "", null);
- props.add(connectionProps.getHostname());
- props.add(connectionProps.getPort());
- props.add(connectionProps.getCatalog());
- Map<String, Object> standardProps =
value.getPoolPropertySynonyms().getStandardProperties();
- props.add(getStandardProperty(standardProps,
"connectionTimeoutMilliseconds"));
- props.add(getStandardProperty(standardProps,
"idleTimeoutMilliseconds"));
- props.add(getStandardProperty(standardProps,
"maxLifetimeMilliseconds"));
- props.add(getStandardProperty(standardProps, "maxPoolSize"));
- props.add(getStandardProperty(standardProps, "minPoolSize"));
- props.add(getStandardProperty(standardProps, "readOnly"));
- Map<String, Object> otherProps =
value.getCustomProperties().getProperties();
- props.add(otherProps.isEmpty() ? "" :
JsonUtils.toJsonString(otherProps));
- result.add(props);
- }
- return result;
- }
-
- private String getStandardProperty(final Map<String, Object>
standardProps, final String key) {
- if (standardProps.containsKey(key) && null != standardProps.get(key)) {
- return standardProps.get(key).toString();
- }
- return "";
- }
-}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 32e144ab645..fe0d5bb5576 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -37,10 +37,10 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFacto
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -96,8 +96,6 @@ class MigrationJobAPITest {
private static MigrationJobAPI jobAPI;
- private static MigrationJobManager migrationJobManager;
-
private static PipelineJobConfigurationManager jobConfigManager;
private static PipelineJobManager jobManager;
@@ -112,8 +110,7 @@ class MigrationJobAPITest {
static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
jobOption = new MigrationJobOption();
- jobAPI = new MigrationJobAPI();
- migrationJobManager = new MigrationJobManager(jobOption);
+ jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
jobManager = new PipelineJobManager(jobOption);
transmissionJobManager = new TransmissionJobManager(jobOption);
@@ -124,13 +121,12 @@ class MigrationJobAPITest {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
- migrationJobManager.addMigrationSourceResources(
- PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+
jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
}
@AfterAll
static void afterClass() {
-
migrationJobManager.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonList("ds_0"));
+
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonList("ds_0"));
}
@Test
@@ -258,20 +254,20 @@ class MigrationJobAPITest {
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0",
"t_order_1")
.map(each -> new SourceTargetEntry("logic_db", new
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
- assertThrows(PipelineInvalidParameterException.class, () ->
migrationJobManager.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
List<SourceTargetEntry> sourceTargetEntries =
Collections.singletonList(new SourceTargetEntry("logic_db", new
DataNode("ds_not_exists", "t_order"), "t_order"));
- assertThrows(PipelineInvalidParameterException.class, () ->
migrationJobManager.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
- String jobId =
migrationJobManager.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
+ String jobId = jobAPI.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -299,7 +295,7 @@ class MigrationJobAPITest {
@Test
void assertShowMigrationSourceResources() {
- Collection<Collection<Object>> actual =
migrationJobManager.listMigrationSourceResources(PipelineContextUtils.getContextKey());
+ Collection<Collection<Object>> actual =
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey());
assertThat(actual.size(), is(1));
Collection<Object> objects = actual.iterator().next();
assertThat(objects.toArray()[0], is("ds_0"));