This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3b15d531953 Improve DDL genereate method at migration prepare (#20245)
3b15d531953 is described below
commit 3b15d53195316122e703b98c0476a4863326732f
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Aug 18 12:48:04 2022 +0800
Improve DDL genereate method at migration prepare (#20245)
* Revise #20207
* Improve parameter name
* Fix codestyle
---
.../metadata/generator/PipelineDDLGenerator.java | 61 +++++++++++-----------
.../datasource/AbstractDataSourcePreparer.java | 33 +++++++-----
.../datasource/PrepareTargetTablesParameter.java | 26 ++++++---
.../scenario/migration/MigrationJobPreparer.java | 32 +++++++++++-
.../datasource/MySQLDataSourcePreparer.java | 31 +----------
.../datasource/OpenGaussDataSourcePreparer.java | 26 +--------
.../datasource/PostgreSQLDataSourcePreparer.java | 27 +---------
7 files changed, 106 insertions(+), 130 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index ca07db51c9a..812dc1bbced 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
-import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
import
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGeneratorFactory;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
@@ -33,11 +33,11 @@ import
org.apache.shardingsphere.infra.binder.type.ConstraintAvailable;
import org.apache.shardingsphere.infra.binder.type.IndexAvailable;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtil;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
+import
org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundException;
import org.apache.shardingsphere.sql.parser.sql.common.segment.SQLSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.constraint.ConstraintSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
@@ -45,6 +45,8 @@ import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.Sim
import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import javax.sql.DataSource;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
@@ -69,8 +71,8 @@ public final class PipelineDDLGenerator {
/**
* Generate logic ddl sql.
*
- * @param database database
- * @param dataSourceName data source name
+ * @param sourceDataSource source data source
+ * @param databaseType database type
* @param schemaName schema name
* @param logicTableName table name
* @param actualTableName actual table name
@@ -78,16 +80,13 @@ public final class PipelineDDLGenerator {
* @return ddl SQL
*/
@SneakyThrows
- public String generateLogicDDLSQL(final ShardingSphereDatabase database,
final String dataSourceName, final String schemaName,
- final String logicTableName, final
String actualTableName,
+ public String generateLogicDDLSQL(final DataSource sourceDataSource, final
DatabaseType databaseType, final String schemaName, final String
logicTableName, final String actualTableName,
final ShardingSphereSQLParserEngine
parserEngine) {
- DatabaseType databaseType = database.getProtocolType();
- log.info("generateLogicDDLSQL, databaseType={}, databaseName={},
schemaName={}, tableName={}, dataSourceNames={}",
- databaseType.getType(), database.getName(), schemaName,
logicTableName, database.getResource().getDataSources().keySet());
- Collection<String> multiSQL = generateActualDDLSQL(databaseType,
schemaName, actualTableName,
database.getResource().getDataSources().get(dataSourceName));
+ log.info("generateLogicDDLSQL, databaseType={}, schemaName={},
tableName={}", databaseType.getType(), schemaName, logicTableName);
+ Collection<String> multiSQL = generateActualDDLSQL(databaseType,
schemaName, actualTableName, sourceDataSource);
StringBuilder result = new StringBuilder();
for (String each : multiSQL) {
- Optional<String> logicSQL = decorate(databaseType,
database.getName(), schemaName, database, each, parserEngine);
+ Optional<String> logicSQL = decorate(databaseType, schemaName,
sourceDataSource, each, logicTableName, parserEngine);
logicSQL.ifPresent(ddlSQL ->
result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
return result.toString();
@@ -118,12 +117,16 @@ public final class PipelineDDLGenerator {
return sql;
}
- private Optional<String> decorate(final DatabaseType databaseType, final
String databaseName, final String schemaName, final ShardingSphereDatabase
database, final String sql,
- final ShardingSphereSQLParserEngine
parserEngine) {
+ private Optional<String> decorate(final DatabaseType databaseType, final
String schemaName, final DataSource dataSource, final String sql, final String
logicTableName,
+ final ShardingSphereSQLParserEngine
parserEngine) throws SQLException {
if (sql.trim().isEmpty()) {
return Optional.empty();
}
- String result = decorateActualSQL(sql.trim(), database, databaseName,
parserEngine);
+ String databaseName;
+ try (Connection connection = dataSource.getConnection()) {
+ databaseName = connection.getCatalog();
+ }
+ String result = decorateActualSQL(sql.trim(), logicTableName,
databaseName, parserEngine);
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result,
parserEngine);
@@ -132,39 +135,38 @@ public final class PipelineDDLGenerator {
}
private Collection<String> generateActualDDLSQL(final DatabaseType
databaseType, final String schemaName, final String actualTable, final
DataSource dataSource) throws SQLException {
- return
CreateTableSQLGeneratorFactory.findInstance(databaseType).orElseThrow(() -> new
ShardingSphereException("Failed to get dialect ddl sql generator"))
+ return
CreateTableSQLGeneratorFactory.findInstance(databaseType).orElseThrow(() -> new
ServiceProviderNotFoundException(CreateTableSQLGenerator.class,
databaseType.getType()))
.generate(actualTable, schemaName, dataSource);
}
- private String decorateActualSQL(final String sql, final
ShardingSphereDatabase database, final String databaseName, final
ShardingSphereSQLParserEngine parserEngine) {
+ private String decorateActualSQL(final String sql, final String
logicTableName, final String databaseName, final ShardingSphereSQLParserEngine
parserEngine) {
LogicSQL logicSQL = getLogicSQL(sql, databaseName, parserEngine);
SQLStatementContext<?> sqlStatementContext =
logicSQL.getSqlStatementContext();
Map<SQLSegment, String> replaceMap = new
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
if (sqlStatementContext instanceof CreateTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, database,
sqlStatementContext);
- appendFromTable(replaceMap, database, (TableAvailable)
sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, logicTableName,
sqlStatementContext);
+ appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
}
if (sqlStatementContext instanceof CommentStatementContext) {
- appendFromTable(replaceMap, database, (TableAvailable)
sqlStatementContext);
+ appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
}
if (sqlStatementContext instanceof CreateIndexStatementContext) {
- appendFromTable(replaceMap, database, (TableAvailable)
sqlStatementContext);
- appendFromIndexAndConstraint(replaceMap, database,
sqlStatementContext);
+ appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, logicTableName,
sqlStatementContext);
}
if (sqlStatementContext instanceof AlterTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, database,
sqlStatementContext);
- appendFromTable(replaceMap, database, (TableAvailable)
sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, logicTableName,
sqlStatementContext);
+ appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
}
return doDecorateActualTable(replaceMap, sql);
}
- private void appendFromIndexAndConstraint(final Map<SQLSegment, String>
replaceMap, final ShardingSphereDatabase database, final SQLStatementContext<?>
sqlStatementContext) {
+ private void appendFromIndexAndConstraint(final Map<SQLSegment, String>
replaceMap, final String logicTableName, final SQLStatementContext<?>
sqlStatementContext) {
if (!(sqlStatementContext instanceof TableAvailable) ||
((TableAvailable)
sqlStatementContext).getTablesContext().getTables().isEmpty()) {
return;
}
TableNameSegment tableNameSegment = ((TableAvailable)
sqlStatementContext).getTablesContext().getTables().iterator().next().getTableName();
- String logicTable = findLogicTable(tableNameSegment, database);
- if (!tableNameSegment.getIdentifier().getValue().equals(logicTable)) {
+ if
(!tableNameSegment.getIdentifier().getValue().equals(logicTableName)) {
if (sqlStatementContext instanceof IndexAvailable) {
for (IndexSegment each : ((IndexAvailable)
sqlStatementContext).getIndexes()) {
String logicIndexName =
IndexMetaDataUtil.getLogicIndexName(each.getIndexName().getIdentifier().getValue(),
tableNameSegment.getIdentifier().getValue());
@@ -180,11 +182,10 @@ public final class PipelineDDLGenerator {
}
}
- private void appendFromTable(final Map<SQLSegment, String> replaceMap,
final ShardingSphereDatabase database, final TableAvailable
sqlStatementContext) {
+ private void appendFromTable(final Map<SQLSegment, String> replaceMap,
final String logicTableName, final TableAvailable sqlStatementContext) {
for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
- String logicTable = findLogicTable(each.getTableName(), database);
- if
(!logicTable.equals(each.getTableName().getIdentifier().getValue())) {
- replaceMap.put(each.getTableName(), logicTable);
+ if
(!logicTableName.equals(each.getTableName().getIdentifier().getValue())) {
+ replaceMap.put(each.getTableName(), logicTableName);
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 15cf5495073..191f9a8bc3f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,27 +17,30 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.datanode.DataNodes;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
/**
* Abstract data source preparer.
@@ -57,7 +60,7 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
String defaultSchema =
DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(),
parameter.getDatabaseName());
log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}",
schemaNames, defaultSchema);
PipelineSQLBuilder pipelineSQLBuilder =
PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
- try (Connection targetConnection =
getTargetCachedDataSource(parameter.getDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
+ try (Connection targetConnection =
getCachedDataSource(parameter.getDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
for (String each : schemaNames) {
if (each.equalsIgnoreCase(defaultSchema)) {
continue;
@@ -91,7 +94,7 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
return
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter()));
}
- protected final PipelineDataSourceWrapper getTargetCachedDataSource(final
PipelineDataSourceConfiguration dataSourceConfig, final
PipelineDataSourceManager dataSourceManager) {
+ protected final PipelineDataSourceWrapper getCachedDataSource(final
PipelineDataSourceConfiguration dataSourceConfig, final
PipelineDataSourceManager dataSourceManager) {
return dataSourceManager.getDataSource(dataSourceConfig);
}
@@ -116,11 +119,17 @@ public abstract class AbstractDataSourcePreparer
implements DataSourcePreparer {
return
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT
EXISTS ");
}
- protected String getActualTable(final ShardingSphereDatabase database,
final String tableName) {
- DataNodes dataNodes = new
DataNodes(database.getRuleMetaData().getRules());
- Optional<DataNode> filteredDataNode =
dataNodes.getDataNodes(tableName).stream()
- .filter(each ->
database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".")
? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
- .findFirst();
- return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
+ protected List<String> listCreateLogicalTableSQL(final
PrepareTargetTablesParameter parameter) {
+ PipelineDDLGenerator generator = new PipelineDDLGenerator();
+ List<String> result = new LinkedList<>();
+ for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
+ String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+ String dataSourceName =
each.getDataNodes().get(0).getDataSourceName();
+ DataSource dataSource =
parameter.getSourceDataSourceMap().get(dataSourceName);
+ DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
+ String actualTableName =
parameter.getTableNameMap().get(each.getLogicTableName());
+ result.add(generator.generateLogicDDLSQL(dataSource, databaseType,
schemaName, each.getLogicTableName(), actualTableName,
parameter.getSqlParserEngine()));
+ }
+ return result;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index 9aaf034ba3a..ffda71fcb52 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -23,6 +23,10 @@ import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+
+import javax.sql.DataSource;
+import java.util.Map;
/**
* Prepare target tables parameter.
@@ -34,19 +38,29 @@ public final class PrepareTargetTablesParameter {
private final JobDataNodeLine tablesFirstDataNodes;
- private final PipelineDataSourceConfiguration dataSourceConfig;
+ private final PipelineDataSourceConfiguration targetDataSourceConfig;
+
+ private final Map<String, DataSource> sourceDataSourceMap;
private final PipelineDataSourceManager dataSourceManager;
+ private final Map<String, String> tableNameMap;
+
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public PrepareTargetTablesParameter(@NonNull final String databaseName,
@NonNull final PipelineDataSourceConfiguration dataSourceConfig,
- @NonNull final
PipelineDataSourceManager dataSourceManager,
- @NonNull final String
tablesFirstDataNodes, final TableNameSchemaNameMapping
tableNameSchemaNameMapping) {
+ private final ShardingSphereSQLParserEngine sqlParserEngine;
+
+ public PrepareTargetTablesParameter(@NonNull final String databaseName,
@NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
+ @NonNull final Map<String, DataSource>
sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
+ @NonNull final JobDataNodeLine
tablesFirstDataNodes, final Map<String, String> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping,
+ @NonNull final
ShardingSphereSQLParserEngine sqlParserEngine) {
this.databaseName = databaseName;
- this.dataSourceConfig = dataSourceConfig;
- this.tablesFirstDataNodes =
JobDataNodeLine.unmarshal(tablesFirstDataNodes);
+ this.targetDataSourceConfig = targetDataSourceConfig;
+ this.sourceDataSourceMap = sourceDataSourceMap;
+ this.tablesFirstDataNodes = tablesFirstDataNodes;
this.dataSourceManager = dataSourceManager;
+ this.tableNameMap = tableNameMap;
this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
+ this.sqlParserEngine = sqlParserEngine;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index e1fbed498b5..384f9fdd1a0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -40,12 +42,21 @@ import
org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.datanode.DataNodes;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
/**
* Migration job preparer.
@@ -133,9 +144,18 @@ public final class MigrationJobPreparer {
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType,
prepareTargetSchemasParameter);
}
+ ShardingSphereMetaData metaData =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
+ ShardingSphereDatabase sphereDatabase =
metaData.getDatabases().get(jobConfig.getDatabaseName());
+ ShardingSphereSQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
+ JobDataNodeLine jobDataNodeLine =
JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
+ Map<String, String> tableNameMap = new HashMap<>();
+ for (JobDataNodeEntry each : jobDataNodeLine.getEntries()) {
+ String actualTableName = getActualTable(sphereDatabase,
each.getLogicTableName());
+ tableNameMap.put(each.getLogicTableName(), actualTableName);
+ }
PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(jobConfig.getDatabaseName(),
-
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
- jobItemContext.getDataSourceManager(),
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
+
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
sphereDatabase.getResource().getDataSources(),
jobItemContext.getDataSourceManager(),
+ jobDataNodeLine, tableNameMap, tableNameSchemaNameMapping,
sqlParserEngine);
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType,
prepareTargetTablesParameter);
}
@@ -149,6 +169,14 @@ public final class MigrationJobPreparer {
return true;
}
+ private String getActualTable(final ShardingSphereDatabase database, final
String tableName) {
+ DataNodes dataNodes = new
DataNodes(database.getRuleMetaData().getRules());
+ Optional<DataNode> filteredDataNode =
dataNodes.getDataNodes(tableName).stream()
+ .filter(each ->
database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".")
? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
+ .findFirst();
+ return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
+ }
+
private void initInventoryTasks(final MigrationJobItemContext
jobItemContext) {
InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager(),
jobItemContext.getJobProcessContext().getImporterExecuteEngine(),
jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(),
jobItemContext.getInitProgress());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 1f4c38b90a7..ceccccf1742 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,22 +18,13 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
/**
* Data source preparer for MySQL.
@@ -44,33 +35,15 @@ public final class MySQLDataSourcePreparer extends
AbstractDataSourcePreparer {
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
PipelineDataSourceManager dataSourceManager =
parameter.getDataSourceManager();
- try (Connection targetConnection =
getTargetCachedDataSource(parameter.getDataSourceConfig(),
dataSourceManager).getConnection()) {
- for (String each : getCreateTableSQL(parameter)) {
+ try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
dataSourceManager).getConnection()) {
+ for (String each : listCreateLogicalTableSQL(parameter)) {
executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(each));
- log.info("create target table '{}' success", each);
}
} catch (final SQLException ex) {
throw new PipelineJobPrepareFailedException("prepare target tables
failed.", ex);
}
}
- private List<String> getCreateTableSQL(final PrepareTargetTablesParameter
parameter) {
- PipelineDDLGenerator generator = new PipelineDDLGenerator();
- ShardingSphereMetaData metaData =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
- ShardingSphereDatabase sphereDatabase =
metaData.getDatabases().get(parameter.getDatabaseName());
- ShardingSphereSQLParserEngine sqlParserEngine =
-
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
-
.getSQLParserEngine(sphereDatabase.getProtocolType().getType());
- List<String> result = new LinkedList<>();
- for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String dataSourceName =
each.getDataNodes().get(0).getDataSourceName();
- result.add(generator.generateLogicDDLSQL(sphereDatabase,
dataSourceName, schemaName, each.getLogicTableName(),
- getActualTable(sphereDatabase, each.getLogicTableName()),
sqlParserEngine));
- }
- return result;
- }
-
@Override
public String getType() {
return "MySQL";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index de5dcbd0a53..f42546f4471 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -20,22 +20,14 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
/**
* Data source preparer for openGauss.
@@ -46,7 +38,7 @@ public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePrepare
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
List<String> createLogicalTableSQLs =
listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection =
getTargetCachedDataSource(parameter.getDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
+ try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
for (String createLogicalTableSQL : createLogicalTableSQLs) {
for (String each :
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
{
executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(each));
@@ -57,22 +49,6 @@ public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePrepare
}
}
- private List<String> listCreateLogicalTableSQL(final
PrepareTargetTablesParameter parameter) {
- PipelineDDLGenerator generator = new PipelineDDLGenerator();
- ShardingSphereMetaData metaData =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
- ShardingSphereDatabase sphereDatabase =
metaData.getDatabases().get(parameter.getDatabaseName());
- ShardingSphereSQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
-
.getSQLParserEngine(sphereDatabase.getProtocolType().getType());
- List<String> result = new LinkedList<>();
- for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String dataSourceName =
each.getDataNodes().get(0).getDataSourceName();
- result.add(generator.generateLogicDDLSQL(sphereDatabase,
dataSourceName, schemaName, each.getLogicTableName(),
- getActualTable(sphereDatabase, each.getLogicTableName()),
sqlParserEngine));
- }
- return result;
- }
-
@Override
public String getType() {
return "openGauss";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource
[...]
index 61287d90bbb..589b1860065 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -20,22 +20,14 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
/**
* Data source preparer for PostgresSQL.
@@ -46,7 +38,7 @@ public final class PostgreSQLDataSourcePreparer extends
AbstractDataSourcePrepar
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
List<String> createLogicalTableSQLs =
listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection =
getTargetCachedDataSource(parameter.getDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
+ try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
for (String createLogicalTableSQL : createLogicalTableSQLs) {
for (String each :
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
{
executeTargetTableSQL(targetConnection, each);
@@ -57,23 +49,6 @@ public final class PostgreSQLDataSourcePreparer extends
AbstractDataSourcePrepar
}
}
- private List<String> listCreateLogicalTableSQL(final
PrepareTargetTablesParameter parameter) {
- PipelineDDLGenerator generator = new PipelineDDLGenerator();
- ShardingSphereMetaData metaData =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
- ShardingSphereDatabase sphereDatabase =
metaData.getDatabases().get(parameter.getDatabaseName());
- ShardingSphereSQLParserEngine sqlParserEngine =
-
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
-
.getSQLParserEngine(sphereDatabase.getProtocolType().getType());
- List<String> result = new LinkedList<>();
- for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String dataSourceName =
each.getDataNodes().get(0).getDataSourceName();
- result.add(generator.generateLogicDDLSQL(sphereDatabase,
dataSourceName, schemaName, each.getLogicTableName(),
- getActualTable(sphereDatabase, each.getLogicTableName()),
sqlParserEngine));
- }
- return result;
- }
-
@Override
public String getType() {
return "PostgreSQL";